Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入

Posted

技术标签:

【中文标题】Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入【英文标题】:Oracle MERGE rewritten to PySpark. If null - update, otherwise - insert 【发布时间】:2021-02-11 14:06:01 【问题描述】:

这些是我的桌子:destinationnew_data

在 Oracle SQL 中我可以这样做:

MERGE INTO destination d
    USING new_data n
    ON (d.c1 = n.c1 AND d.c2 = n.c2)
  WHEN MATCHED THEN
    UPDATE SET d.d1 = n.d1
         WHERE d.d1 IS NULL
  WHEN NOT MATCHED THEN
    INSERT (c1, c2, d1)
    VALUES (n.c1, n.c2, n.d1);

然后destination 表变成这样:

如果c1c2 存在于destination 中并且d1 为空,则d1 将被更新。 如果c1c2 不存在,则插入行。

有没有办法在 PySpark 中做同样的事情?

这会生成数据帧:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5), 
         ('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)

nData = [('a', 'b', 1),
         ('c', 'd', 6),
         ('e', 'f', 7),
         ('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)

在 PySpark 中几乎包含 SQL 中的所有内容。但我找不到 MERGE 的等价物。

【问题讨论】:

【参考方案1】:

在SQL中,MERGE可以替换为left join union right join全外连接:

merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
    .selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")

merged.show()

#+---+---+----+
#| c1| c2|  d1|
#+---+---+----+
#|  e|  f|   7|
#|  g|  h|null|
#|  c|  d|   6|
#|  a|  b|   5|
#+---+---+----+

但是,每次执行此合并时,您都需要将所有数据重写到目标中,因为 Spark 不支持更新,这可能会导致性能下降。因此,如果您确实需要这样做,我建议您查看Delta Lake,它会带来 ACID 要触发的事务,并且支持MERGE 语法。

【讨论】:

【参考方案2】:

您可以使用coalesce 进行左连接和合并列

import pyspark.sql.functions as F

result = new_data.alias('t1').join(
    destination.alias('t2'),
    ['c1', 'c2'],
    'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))

result.show()
+---+---+----+
| c1| c2|  d1|
+---+---+----+
|  e|  f|   7|
|  g|  h|null|
|  c|  d|   6|
|  a|  b|   5|
+---+---+----+

【讨论】:

以上是关于Oracle MERGE 重写为 PySpark。如果为空 - 更新,否则 - 插入的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:当通过 JDBC 在 Oracle 中创建表时,为啥我会得到“没有为类 oracle.jdbc.driver.T4CRowidAccessor 实现 getLong”?

关于熊猫代码的pyspark meandays计算

PySpark:从 Oracle 表中选择一个值,然后添加到它

ORACLE11g update和merge into 的区别

Oracle存在则更新,不存在则插入应用-merge

在oracle中使用merge into实现更新和插入数据