如何使用替换 Where 子句实现以下火花行为

Posted

技术标签:

【中文标题】如何使用替换 Where 子句实现以下火花行为【英文标题】:How can I acheive following spark behaviour using replaceWhere clause 【发布时间】:2021-08-13 10:28:40 【问题描述】:

我想在替换(覆盖)接收器中已经存在的分区的同时增量地在增量表中写入数据。例子: 考虑一下我的增量表中的这些数据已经被 id 列分区:

+---+---+
| id|  x|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
+---+---+

现在,我想插入以下数据框:

+---+---------+
| id|        x|
+---+---------+
|  2|      NEW|
|  2|      NEW|
|  4|        D|
|  5|        E|
+---+---------+

想要的输出是这样的

+---+---------+
| id|        x|
+---+---------+
|  1|        A|
|  2|      NEW|
|  2|      NEW|
|  3|        C|
|  4|        D|
|  5|        E|
+---+---------+

我所做的是:

df = spark.read.format("csv").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/in/input.csv")
Ids=[x.id for x in df.select("id").distinct().collect()]
for Id in Ids:
  df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/")
spark.read.format("delta").option("sep", ";").option("header", "true").load("/mnt/blob/datafinance/bronze/simba/test/res/").show()

结果如下:

+---+---------+
| id|        x|
+---+---------+
|  2|        B|
|  1|        A|
|  5|        E|
|  2|      NEW|
|  2|NEW AUSSI|
|  3|        C|
|  4|        D|
+---+---------+

如您所见,它附加了所有值,而没有替换表中已经存在的分区 id=2。

我认为是因为mode("append")。 但是将其更改为mode("overwrite") 会引发以下错误:

Data written out does not match replaceWhere 'id == '$i''.

谁能告诉我如何实现我想要的?

谢谢。

【问题讨论】:

我认为你应该使用dynamic overwrite option。我回答了replaceWhere option,也许你能理解为什么你的流程没有按预期工作 我的代码不适合这种用例吗?你能把你的方法应用到我的案子上吗?对我来说似乎是同样的事情。 动态覆盖不需要过滤,它只是df.write.save('path', format='delta', mode='overwrite'),Spark 会为您完成工作。 replaceWhere 在处理日期分区或范围值时可能很有用 所以我用spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") and df.write.partitionBy(PartitionKey).save('/mnt/blob/datafinance/bronze/simba/test/res/', format='delta', mode='overwrite') 替换了df.filter(df.id==Id).write.format("delta").option("mergeSchema", "true").partitionBy("id").option("replaceWhere", "id == '$i'".format(i=Id)).mode("append").save("/mnt/blob/datafinance/bronze/simba/test/res/"),它覆盖了整个数据帧,而不仅仅是id=2 分区 这里是删除更新的情况 【参考方案1】:

我实际上在代码中有一个错误。我换了

.option("replaceWhere", "id == '$i'".format(i=idd))

.option("replaceWhere", "id == 'i'".format(i=idd))

它成功了。

感谢@ggordon 注意到我另一个问题的错误。

【讨论】:

以上是关于如何使用替换 Where 子句实现以下火花行为的主要内容,如果未能解决你的问题,请参考以下文章

在 where 子句中替换动态运算符而不是手动运算符

JDBC WHERE子句条件实例

如果 where 子句已经修复,如何加快 spark sql 过滤器查询?

如何在WHERE子句中使用MS Access SQL子查询来替换长OR表达式

如何在光标的 select 语句 where 子句中传递逗号分隔值

如何在火花的过滤条件中使用NOT IN子句