重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇文章给大家分享的是有关如何进行数据湖deltalake中的时间旅行及版本管理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
成都创新互联公司网站建设提供从项目策划、软件开发,软件安全维护、网站优化(SEO)、网站分析、效果评估等整套的建站服务,主营业务为成都网站建设、成都做网站,重庆APP软件开发以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。成都创新互联公司深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!
deltalake支持数据版本管理和时间旅行:提供了数据快照,使开发人员能够访问和还原早期版本的数据以进行审核、回滚或重新计算。
1.场景
delta lake的时间旅行,实际上就是利用多版本管理机制,查询历史的delta 表快照。时间旅行有以下使用案例:
1).可以重复创建数据分析,报告或者一些输出(比如,机器学习模型)。这主要是有利于调试和安全审查,尤其是在受管制的行业里。
2).编写复杂的基于时间的查询。
3).修正数据中的错误信息。
4).为一组查询提供快照隔离,以快速变更表。
2.配置
DataframeTable支持创建dataframe的时候指定一个delta lake表的版本信息:
val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")val df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")
对于版本号,直接传入一个版本数值即可,如下:
val df2 = spark.read.format("delta").option("versionAsOf", 0).table(tableName)
对于timestamp字符串,必须要是date格式或者timestamp格式。例如:
val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28").load("/delta/events")
val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28T00:00:00.000Z").load("/delta/events")
由于delta lake的表是存在更新的情况,所以多次读取数据生成的dataframe之间会有差异,因为两次读取数据可能是一次是数据更新前,另一次是数据更新后。使用时间旅行你就可以在多次调用之间修复数据。
val latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect()val df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/delta/events")
3.数据保存时间
默认情况下,deltalake保存最近30天的提交历史。这就意味着可以指定30天之前的版本来读取数据,但是有些注意事项:
3.1 没对delta 表调用VACUUM函数。VACUUM函数是用来删除不在引用的delta表和一些超过保留时间的表,支持sql和API形式。
slq表达式:
VACUUM eventsTable -- vacuum files not required by versions older than the default retention period
VACUUM '/data/events' -- vacuum files in path-based table
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable DRY RUN -- do dry run to get the list of files to be deleted
scala API 表达式
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) // vacuum files not required by versions more than 100 hours old
可以通过下面两个delta 表属性配置来
delta.logRetentionDuration =“ interval
delta.deletedFileRetentionDuration =“ interval
注意:VACUUM命令是不会删除日志文件的,日志文件是在checkpoint之后自动删除的。
为了读取之前版本的数据,必须要保留该版本的日志文件和数据文件。
4.案例
修复意外删除的用户111的数据。
INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
修复错误更新的数据
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
查询过去七天新增的消费者数:
SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
以上就是如何进行数据湖deltalake中的时间旅行及版本管理,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。