WriteStream 无法在 Delta 表中写入数据

Posted

技术标签:

【中文标题】WriteStream 无法在 Delta 表中写入数据【英文标题】:WriteStream is not able to write Data in Delta Table 【发布时间】:2021-07-21 12:20:18 【问题描述】:

我正在尝试使用以下代码从流式传输路径连接流式传输 Json 文件

Schema1= "customerId STRING,orderId STRING,products ARRAY<STRUCT<productId: STRING,quantity: STRING,soldPrice: STRING>>,salesRepId STRING,shippingAddress STRUCT<address: STRING,attention: String,city: STRING,state: STRING,zip: STRING>,submittedAt TIMESTAMP";
streamingDF = (spark.readStream.schema(Schema1)\
  .option("maxFilesPerTrigger", 1).json(stream_path))

在 streamingDF Streaming Dataset 中进行几次转换并尝试使用以下代码写入 Delta 表

streamingDF.writeStream.outputMode("append")\
  .option("checkpointLocation", orders_checkpoint_path)\
  .partitionBy("submitted_yyyy_mm")\
  .table("sachin")

但是这些记录没有插入到我们的增量表中,而且当我检查仪表板时,它显示 numInputRows 为 0

Screenshot of streaming while writestream being executed

为什么这些记录没有追加到增量表中?

【问题讨论】:

根据图表,您没有任何输入数据 尝试删除检查点文件夹并再次运行。 dbutils.fs.rm(orders_checkpoint_path, True) 是的,当我们在删除检查点路径后运行它时它会起作用 【参考方案1】:

而不是使用 .table() 使用 .start() 然后路径表路径而不是表名:

   streamingDF.writeStream.outputMode("append")
  .option("checkpointLocation", orders_checkpoint_path)
  .partitionBy("submitted_yyyy_mm")
  .start("/pathtotable/sachin")

【讨论】:

以上是关于WriteStream 无法在 Delta 表中写入数据的主要内容,如果未能解决你的问题,请参考以下文章