WriteStream 无法在 Delta 表中写入数据

Posted

技术标签:

【中文标题】WriteStream 无法在 Delta 表中写入数据【英文标题】:WriteStream is not able to write Data in Delta Table 【发布时间】:2021-07-21 12:20:18 【问题描述】:

我正在尝试使用以下代码从流式传输路径连接流式传输 Json 文件

Schema1= "customerId STRING,orderId STRING,products ARRAY<STRUCT<productId: STRING,quantity: STRING,soldPrice: STRING>>,salesRepId STRING,shippingAddress STRUCT<address: STRING,attention: String,city: STRING,state: STRING,zip: STRING>,submittedAt TIMESTAMP";
streamingDF = (spark.readStream.schema(Schema1)\
  .option("maxFilesPerTrigger", 1).json(stream_path))

在 streamingDF Streaming Dataset 中进行几次转换并尝试使用以下代码写入 Delta 表

streamingDF.writeStream.outputMode("append")\
  .option("checkpointLocation", orders_checkpoint_path)\
  .partitionBy("submitted_yyyy_mm")\
  .table("sachin")

但是这些记录没有插入到我们的增量表中,而且当我检查仪表板时,它显示 numInputRows 为 0

Screenshot of streaming while writestream being executed

为什么这些记录没有追加到增量表中?

【问题讨论】:

根据图表,您没有任何输入数据 尝试删除检查点文件夹并再次运行。 dbutils.fs.rm(orders_checkpoint_path, True) 是的,当我们在删除检查点路径后运行它时它会起作用 【参考方案1】:

而不是使用 .table() 使用 .start() 然后路径表路径而不是表名:

   streamingDF.writeStream.outputMode("append")
  .option("checkpointLocation", orders_checkpoint_path)
  .partitionBy("submitted_yyyy_mm")
  .start("/pathtotable/sachin")

【讨论】:

以上是关于WriteStream 无法在 Delta 表中写入数据的主要内容,如果未能解决你的问题,请参考以下文章

Angular 9在Build上出错:找不到WriteStream & ReadStream这个名字。

在 csv 与 delta 表中使用 df.coalesce(1)

当源表的一行中的多个列与目标表中单行的相同列匹配时,从目标 spark delta 表中删除一行

Spark 从另一个表更新 Delta 中的多个列

如何在单个 Spark 作业中调用多个 writeStream 操作?

如何在 writeStream 中访问数组类型中的元素?