当源表的一行中的多个列与目标表中单行的相同列匹配时,从目标 spark delta 表中删除一行
Posted
技术标签:
【中文标题】当源表的一行中的多个列与目标表中单行的相同列匹配时,从目标 spark delta 表中删除一行【英文标题】:Delete a row from target spark delta table when multiple columns in a row of source table matches with same columns of a single row in target table 【发布时间】:2020-06-08 16:48:32 【问题描述】:当行中的某些列值与源表中的相同列值匹配时,我想更新数据块中的目标 Delta 表。
问题是当我在源表中有多行与目标 Delta 表中的一行匹配时。
这是一种情况,源表中两行或多行的主键与增量表中一行的主键匹配。 我试图复制以下场景:
sql="""
MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
WHEN MATCHED THEN DELETE"""
targetDF = spark.createDataFrame(["id": "5001","category": "N1","startDt": "2019-09-30 00:00:00.000"])
sourceDF = spark.createDataFrame(["id": "5001","category": "E1","startDt": "2019-09-30 00:00:00.000","id": "5001","category": "B1","startDt": "2019-09-30 00:00:00.000"])
targetDF.write.format("delta").mode("overwrite").saveAsTable("test.targetDF")
sourceDF.createOrReplaceTempView("tempView")
sqlOut=spark.sql(sql)
display(spark.sql("select * from test.targetDelta"))
我在两个表上尝试了左连接(targetTable left join sourceTable),其中我的 id 和 startDt 匹配,以在我的 targetTable 中获取一行,我想删除但不知道该怎么做。
spark.sql("""Select TGT.id from test.targetDF TGT left join tempView SRC ON TGT.id = SRC.id and TGT.startDt= SRC.startDt""")
提前致谢。
【问题讨论】:
【参考方案1】: package spark
import org.apache.spark.sql.SparkSession
object ap1 extends App
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class D(id: String, category: String, startDt: String)
val targetDF = Seq(D("5001", "N1","2019-09-30 00:00:00.000"))
.toDF()
val sourceDF = Seq(D("5001", "E1", "2019-09-30 00:00:00.000"),
D("5001","B1","2019-09-30 00:00:00.000"))
.toDF()
val res = targetDF.join(sourceDF, targetDF.col("id") === sourceDF.col("id") &&
targetDF.col("startDt") === sourceDF.col("startDt") , "left_semi")
res.show(false)
// +----+--------+-----------------------+
// |id |category|startDt |
// +----+--------+-----------------------+
// |5001|N1 |2019-09-30 00:00:00.000|
// +----+--------+-----------------------+
【讨论】:
非常感谢。 left_semi join 完美运行。我将在目标表上添加 spark.sql 和 pyspark 版本的 Delete 操作【参考方案2】:我已经在 spark.sql 中执行了@mvasyliv 提供的答案,并在目标表中的行与源表中的多行匹配时添加了从目标表中删除行的操作。
spark.sql 版本
spark.sql("DELETE FROM MDM.targetDF a WHERE EXISTS(Select * from MDM.targetDF TGT left semi join tempView SRC ON TGT.id = SRC.id and TGT.startDt = SRC.startDt)").show()
Pyspark 版本:
from pyspark.sql.functions import *
sql="""
MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
WHEN MATCHED THEN DELETE"""
finalDF = targetDF.join(sourceDF, ((targetDF.id == sourceDF.id) & (targetDF.startDt == sourceDF.startDt)), "left_semi")
finalDF.createOrReplaceTempView("tempView")
sqlOut=spark.sql(sql)
display(spark.sql("select * from test.targetDelta"))
【讨论】:
以上是关于当源表的一行中的多个列与目标表中单行的相同列匹配时,从目标 spark delta 表中删除一行的主要内容,如果未能解决你的问题,请参考以下文章