Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入
Posted
技术标签:
【中文标题】Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入【英文标题】:Oracle MERGE rewritten to PySpark. If null - update, otherwise - insert 【发布时间】:2021-02-11 14:06:01 【问题描述】:这些是我的桌子:destination
new_data
在 Oracle SQL 中我可以这样做:
MERGE INTO destination d
USING new_data n
ON (d.c1 = n.c1 AND d.c2 = n.c2)
WHEN MATCHED THEN
UPDATE SET d.d1 = n.d1
WHERE d.d1 IS NULL
WHEN NOT MATCHED THEN
INSERT (c1, c2, d1)
VALUES (n.c1, n.c2, n.d1);
然后destination
表变成这样:
如果c1
、c2
存在于destination
中并且d1
为空,则d1
将被更新。
如果c1
、c2
不存在,则插入行。
有没有办法在 PySpark 中做同样的事情?
这会生成数据帧:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5),
('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)
nData = [('a', 'b', 1),
('c', 'd', 6),
('e', 'f', 7),
('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)
在 PySpark 中几乎包含 SQL 中的所有内容。但我找不到 MERGE
的等价物。
【问题讨论】:
【参考方案1】:在SQL中,MERGE
可以替换为left join union right join全外连接:
merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
.selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")
merged.show()
#+---+---+----+
#| c1| c2| d1|
#+---+---+----+
#| e| f| 7|
#| g| h|null|
#| c| d| 6|
#| a| b| 5|
#+---+---+----+
但是,每次执行此合并时,您都需要将所有数据重写到目标中,因为 Spark 不支持更新,这可能会导致性能下降。因此,如果您确实需要这样做,我建议您查看Delta Lake,它会带来 ACID 要触发的事务,并且支持MERGE 语法。
【讨论】:
【参考方案2】:您可以使用coalesce
进行左连接和合并列
import pyspark.sql.functions as F
result = new_data.alias('t1').join(
destination.alias('t2'),
['c1', 'c2'],
'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))
result.show()
+---+---+----+
| c1| c2| d1|
+---+---+----+
| e| f| 7|
| g| h|null|
| c| d| 6|
| a| b| 5|
+---+---+----+
【讨论】:
以上是关于Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入的主要内容,如果未能解决你的问题,请参考以下文章
PySpark:当通过 JDBC 在 Oracle 中创建表时,为啥我会得到“没有为类 oracle.jdbc.driver.T4CRowidAccessor 实现 getLong”?
PySpark:从 Oracle 表中选择一个值,然后添加到它