Databricks - 从增量表写入流到 orc 文件的读取流仅具有更改
Posted
技术标签:
【中文标题】Databricks - 从增量表写入流到 orc 文件的读取流仅具有更改【英文标题】:Databricks - readstream from delta table writestream to orc file only with changes 【发布时间】:2021-10-29 22:41:04 【问题描述】:管道每 20 分钟运行一次,以 ORC 格式将数据推送到 ADLS Gen2 存储。
-
我有一个每 1 小时运行一次的 Azure Databricks 笔记本作业。
此作业从 ADLS 读取 orc 文件作为结构化流(由上述管道创建的 orc 文件),然后使用合并功能根据 primaryKey 列将数据更新到增量表。
event_readstream = (
spark.readStream
.format("orc")
.schema('my-schema.json')
.load('/mnt/path/from/adls/to/orc/files/')
)
...
def upsertEventToDeltaTable(df, batch_id):
input_df = (
df
.drop('address_line3')
.dropDuplicates(['primaryKey'])
.withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
)
input_df.createOrReplaceTempView("event_df_updates")
input_df._jdf.sparkSession().sql("""
MERGE INTO events dbEvent
USING event_df_updates dfEvent
ON dbEvent.primaryKey = dfEvent.primaryKey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
event_writestream = (
event_readStream
.drop('adddress_line3') #some column to be dropped
.repartition(1)
.writeStream
.trigger(once=True)
.format("delta")
.option("checkpointLocation", "/checkpoints/".format(adls_mount_path,mytable))
.foreachBatch(upsertEventToDeltaTable)
.start()
)
-
同一笔记本还使用读取流(结构化流)并将数据直接写入 ADLS Gen2 存储中的不同位置。这也使用了 writestream 中的
forEachBatch()
并启用了 checkpoint
选项。
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('/event/data'.format(adls_mount_path))
)
location_writestream = (
event_readstream # Same read stream is used here
.writeStream
.trigger(once=True)
.option("checkpointLocation", "/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation)
.start()
)
问题:
在我的情况下,Delta 表每 1 小时更新一次数据,如果我创建新的 readstream(从 delta table 读取)和 writestream(写入 ORC 文件)。 ORC 文件是否仅包含在 delta 表中合并的更改? [详情如下] 如果仅将更改或更新的数据写入 ORC 文件,这种方法是否存在任何问题?在上面的第 2 点中,不要使用 readStream(从 orc 文件读取),而是使用如下所示的 Delta 表路径创建一个新的 readStream
deltatbl_event_readstream = spark.readStream.format("delta")
.load("/mnt/delta/myadlsaccnt/user_events") # my delta table location
并使用不同的写入流,如下所示
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('/event/data'.format(adls_mount_path))
)
deltatbl_event_readstream
.writeStream
.trigger(once=True)
.option("checkpointLocation", "/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation) # re-using the same method.
.start()
【问题讨论】:
【参考方案1】:如果您只是在 Delta 表上使用纯 readStream
而没有任何选项,那么您将不会获得有关更新的信息。实际上,更新后流将失败,直到您设置选项ignoreChanges
。这是因为 Delta 不跟踪更改,并且当您进行更新/删除时,它正在重写现有文件,因此通过查看文件您只会看到数据,而不知道它是插入还是更新。
但是如果您需要从 Delta 流式传输更改,那么您可以使用 Delta 8.4 中引入的Delta Change Data Feed (CDF) 功能(如果我没记错的话)。要使其工作,您需要通过将属性delta.enableChangeDataFeed
设置为true
在源增量表上启用它。从那个版本开始,您将能够阅读更改的提要,如下所示:
deltatbl_event_readstream = spark.readStream.format("delta")\
.option("readChangeFeed", "true") \
.option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
.load("...")
这将添加三个额外的列来描述所执行的操作、Delta 的版本和时间戳。如果您只需要跟踪更改,则只需选择_change_type
列的值为update_postimage
的行,然后您可以将该数据存储在您需要的任何位置。
但请注意,在表上启用 CDF 后,其他客户端(DBR
【讨论】:
【参考方案2】:我遇到了链接DATA+AI summit,其中包含此类场景的演示。
就我而言,每批都有 >90% 的新行,更新较少。所以我不能使用这个选项。这可能对其他人有所帮助。
以下类似于 Alex Ott 的回答,添加了附加信息
根据建议,如果批量更新更多,CDF 可能无效。
启用 CDF 功能:%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
对表执行任何更新/插入操作
使用table_changes()
函数查看变化
%sql
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
读取为流
event_read_stream = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.table("event") #// table name
.filter("_change_type != 'update_preimage'")
创建合并更改的 upsert 函数
写入流以写入信息
event_read_stream.writeStream.format("delta")
.trigger(processingTime = "2 seconds") # if in case if job use once
.outputMode("append")
.start()
【讨论】:
以上是关于Databricks - 从增量表写入流到 orc 文件的读取流仅具有更改的主要内容,如果未能解决你的问题,请参考以下文章
Databricks 增量表与 SQL Server 增量表