Databricks - 从增量表写入流到 orc 文件的读取流仅具有更改

Posted

技术标签:

【中文标题】Databricks - 从增量表写入流到 orc 文件的读取流仅具有更改【英文标题】:Databricks - readstream from delta table writestream to orc file only with changes 【发布时间】:2021-10-29 22:41:04 【问题描述】:

管道每 20 分钟运行一次,以 ORC 格式将数据推送到 ADLS Gen2 存储。

    我有一个每 1 小时运行一次的 Azure Databricks 笔记本作业。 此作业从 ADLS 读取 orc 文件作为结构化流(由上述管道创建的 orc 文件),然后使用合并功能根据 primaryKey 列将数据更新到增量表。
event_readstream = ( 
    spark.readStream
    .format("orc")
    .schema('my-schema.json')
    .load('/mnt/path/from/adls/to/orc/files/')
  )
  ...

def upsertEventToDeltaTable(df, batch_id): 
  input_df = (
    df
    .drop('address_line3')
    .dropDuplicates(['primaryKey'])
    .withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
  )
  
  input_df.createOrReplaceTempView("event_df_updates")
  
  input_df._jdf.sparkSession().sql("""
    MERGE INTO events dbEvent
    USING event_df_updates dfEvent
    ON dbEvent.primaryKey = dfEvent.primaryKey
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
                             """)

event_writestream = ( 
    event_readStream
    .drop('adddress_line3')  #some column to be dropped
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .format("delta")
    .option("checkpointLocation", "/checkpoints/".format(adls_mount_path,mytable))
    .foreachBatch(upsertEventToDeltaTable)
    .start()
  )
    同一笔记本还使用读取流(结构化流)并将数据直接写入 ADLS Gen2 存储中的不同位置。这也使用了 writestream 中的 forEachBatch() 并启用了 checkpoint 选项。

def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('/event/data'.format(adls_mount_path))  
  )

location_writestream = ( 
  event_readstream   # Same read stream is used here 
  .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)
  .start()
)

问题:

在我的情况下,Delta 表每 1 小时更新一次数据,如果我创建新的 readstream(从 delta table 读取)和 writestream(写入 ORC 文件)。 ORC 文件是否仅包含在 delta 表中合并的更改? [详情如下] 如果仅将更改或更新的数据写入 ORC 文件,这种方法是否存在任何问题?

在上面的第 2 点中,不要使用 readStream(从 orc 文件读取),而是使用如下所示的 Delta 表路径创建一个新的 readStream

deltatbl_event_readstream = spark.readStream.format("delta")
  .load("/mnt/delta/myadlsaccnt/user_events")  # my delta table location
并使用不同的写入流,如下所示
def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('/event/data'.format(adls_mount_path))  
  )

deltatbl_event_readstream
   .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)  # re-using the same method.
  .start()

【问题讨论】:

【参考方案1】:

如果您只是在 Delta 表上使用纯 readStream 而没有任何选项,那么您将不会获得有关更新的信息。实际上,更新后流将失败,直到您设置选项ignoreChanges。这是因为 Delta 不跟踪更改,并且当您进行更新/删除时,它正在重写现有文件,因此通过查看文件您只会看到数据,而不知道它是插入还是更新。

但是如果您需要从 Delta 流式传输更改,那么您可以使用 Delta 8.4 中引入的Delta Change Data Feed (CDF) 功能(如果我没记错的话)。要使其工作,您需要通过将属性delta.enableChangeDataFeed 设置为true 在源增量表上启用它。从那个版本开始,您将能够阅读更改的提要,如下所示:

deltatbl_event_readstream = spark.readStream.format("delta")\
  .option("readChangeFeed", "true") \
  .option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
  .load("...")

这将添加三个额外的列来描述所执行的操作、Delta 的版本和时间戳。如果您只需要跟踪更改,则只需选择_change_type 列的值为update_postimage 的行,然后您可以将该数据存储在您需要的任何位置。

但请注意,在表上启用 CDF 后,其他客户端(DBR

【讨论】:

【参考方案2】:

我遇到了链接DATA+AI summit,其中包含此类场景的演示。

就我而言,每批都有 >90% 的新行,更新较少。所以我不能使用这个选项。这可能对其他人有所帮助。

以下类似于 Alex Ott 的回答,添加了附加信息

根据建议,如果批量更新更多,CDF 可能无效。

启用 CDF 功能:
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
对表执行任何更新/插入操作 使用table_changes()函数查看变化
%sql 
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
读取为流
event_read_stream = spark.readStream
     .format("delta")
     .option("readChangeFeed", "true")
     .option("startingVersion", "latest")
     .table("event") #// table name 
     .filter("_change_type != 'update_preimage'")
创建合并更改的 upsert 函数 写入流以写入信息
event_read_stream.writeStream.format("delta")
  .trigger(processingTime = "2 seconds") # if in case if job use once
  .outputMode("append")
  .start()

【讨论】:

以上是关于Databricks - 从增量表写入流到 orc 文件的读取流仅具有更改的主要内容,如果未能解决你的问题,请参考以下文章

Databricks 更新表不适用于 orc 格式

平面文件(orc,csv)比火花中的增量表更有效吗

Databricks 增量表与 SQL Server 增量表

使用python截断Databricks中的增量表

将 Azure Databricks 增量表迁移到 Azure Synapse SQL 池

Databricks 无服务器计算机 - 写回增量表