如何使用替换 Where 子句实现以下火花行为
Posted
技术标签:
【中文标题】如何使用替换 Where 子句实现以下火花行为【英文标题】:How can I acheive following spark behaviour using replaceWhere clause 【发布时间】:2021-08-13 10:28:40 【问题描述】:我想在替换(覆盖)接收器中已经存在的分区的同时增量地在增量表中写入数据。例子: 考虑一下我的增量表中的这些数据已经被 id 列分区:
+---+---+
| id| x|
+---+---+
| 1| A|
| 2| B|
| 3| C|
+---+---+
现在,我想插入以下数据框:
+---+---------+
| id| x|
+---+---------+
| 2| NEW|
| 2| NEW|
| 4| D|
| 5| E|
+---+---------+
想要的输出是这样的
+---+---------+
| id| x|
+---+---------+
| 1| A|
| 2| NEW|
| 2| NEW|
| 3| C|
| 4| D|
| 5| E|
+---+---------+
我所做的是:
df = spark.read.format("csv").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/in/input.csv")
Ids=[x.id for x in df.select("id").distinct().collect()]
for Id in Ids:
df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")
spark.read.format("delta").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/res/").show()
结果如下:
+---+---------+
| id| x|
+---+---------+
| 2| B|
| 1| A|
| 5| E|
| 2| NEW|
| 2|NEW AUSSI|
| 3| C|
| 4| D|
+---+---------+
如您所见,它附加了所有值,而没有替换表中已经存在的分区 id=2。
我认为是因为mode("append")
。
但是将其更改为mode("overwrite")
会引发以下错误:
Data written out does not match replaceWhere 'id == '$i''.
谁能告诉我如何实现我想要的?
谢谢。
【问题讨论】:
我认为你应该使用dynamic overwrite option。我回答了replaceWhere option,也许你能理解为什么你的流程没有按预期工作 我的代码不适合这种用例吗?你能把你的方法应用到我的案子上吗?对我来说似乎是同样的事情。 动态覆盖不需要过滤,它只是df.write.save('path', format='delta', mode='overwrite')
,Spark 会为您完成工作。 replaceWhere
在处理日期分区或范围值时可能很有用
所以我用spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") and df.write.partitionBy(PartitionKey).save('/mnt/blob/datafinance/bronze/simba/test/res/', format='delta', mode='overwrite')
替换了df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")
,它覆盖了整个数据帧,而不仅仅是id=2 分区
这里是删除更新的情况
【参考方案1】:
我实际上在代码中有一个错误。我换了
.option("replaceWhere", "id == '$i'".format(i=idd))
与
.option("replaceWhere", "id == 'i'".format(i=idd))
它成功了。
感谢@ggordon
注意到我另一个问题的错误。
【讨论】:
以上是关于如何使用替换 Where 子句实现以下火花行为的主要内容,如果未能解决你的问题,请参考以下文章
如果 where 子句已经修复,如何加快 spark sql 过滤器查询?
如何在WHERE子句中使用MS Access SQL子查询来替换长OR表达式