当源表的一行中的多个列与目标表中单行的相同列匹配时,从目标 spark delta 表中删除一行

Posted

技术标签:

【中文标题】当源表的一行中的多个列与目标表中单行的相同列匹配时,从目标 spark delta 表中删除一行【英文标题】:Delete a row from target spark delta table when multiple columns in a row of source table matches with same columns of a single row in target table 【发布时间】:2020-06-08 16:48:32 【问题描述】:

当行中的某些列值与源表中的相同列值匹配时,我想更新数据块中的目标 Delta 表。

问题是当我在源表中有多行与目标 Delta 表中的一行匹配时。

这是一种情况,源表中两行或多行的主键与增量表中一行的主键匹配。 我试图复制以下场景:

    sql="""
    MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
    WHEN MATCHED THEN DELETE""" 

    targetDF = spark.createDataFrame(["id": "5001","category": "N1","startDt": "2019-09-30 00:00:00.000"])
    sourceDF = spark.createDataFrame(["id": "5001","category": "E1","startDt": "2019-09-30 00:00:00.000","id": "5001","category": "B1","startDt": "2019-09-30 00:00:00.000"])
    targetDF.write.format("delta").mode("overwrite").saveAsTable("test.targetDF")
    sourceDF.createOrReplaceTempView("tempView")

    sqlOut=spark.sql(sql)
    display(spark.sql("select * from test.targetDelta"))

我在两个表上尝试了左连接(targetTable left join sourceTable),其中我的 id 和 startDt 匹配,以在我的 targetTable 中获取一行,我想删除但不知道该怎么做。

    spark.sql("""Select TGT.id from test.targetDF TGT left join  tempView  SRC ON TGT.id = SRC.id and TGT.startDt= SRC.startDt""")

提前致谢。

【问题讨论】:

【参考方案1】:
    package spark

import org.apache.spark.sql.SparkSession

object ap1 extends App 
  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class D(id: String, category: String, startDt: String)

  val targetDF = Seq(D("5001", "N1","2019-09-30 00:00:00.000"))
  .toDF()
  val sourceDF = Seq(D("5001", "E1", "2019-09-30 00:00:00.000"),
    D("5001","B1","2019-09-30 00:00:00.000"))
    .toDF()


  val res = targetDF.join(sourceDF, targetDF.col("id") === sourceDF.col("id") &&
    targetDF.col("startDt") === sourceDF.col("startDt")  , "left_semi")

  res.show(false)
//  +----+--------+-----------------------+
//  |id  |category|startDt                |
//  +----+--------+-----------------------+
//  |5001|N1      |2019-09-30 00:00:00.000|
//  +----+--------+-----------------------+


【讨论】:

非常感谢。 left_semi join 完美运行。我将在目标表上添加 spark.sql 和 pyspark 版本的 Delete 操作【参考方案2】:

我已经在 spark.sql 中执行了@mvasyliv 提供的答案,并在目标表中的行与源表中的多行匹配时添加了从目标表中删除行的操作。

spark.sql 版本

   spark.sql("DELETE FROM MDM.targetDF a WHERE EXISTS(Select * from MDM.targetDF TGT left semi join  tempView  SRC ON TGT.id = SRC.id and TGT.startDt = SRC.startDt)").show()

Pyspark 版本:

    from pyspark.sql.functions import *

    sql="""
    MERGE INTO """ + targetTable + """ TGT USING """ + tempView + """ SRC ON TGT.id = SRC.id and TGT.category != SRC.category and TGT.startdt = SRC.startdt
    WHEN MATCHED THEN DELETE""" 

    finalDF = targetDF.join(sourceDF, ((targetDF.id == sourceDF.id) & (targetDF.startDt == sourceDF.startDt)), "left_semi")

    finalDF.createOrReplaceTempView("tempView")

    sqlOut=spark.sql(sql)
    display(spark.sql("select * from test.targetDelta"))

【讨论】:

以上是关于当源表的一行中的多个列与目标表中单行的相同列匹配时,从目标 spark delta 表中删除一行的主要内容,如果未能解决你的问题,请参考以下文章

SQL Server字符串或二进制数据将被截断

将表中的列与 hive 中另一个表的列进行比较

如果任何列与另一个表中的匹配行不同,如何插入行

当源表中不存在相关行时更新

即使满足条件并且即使目标表和源表中的字段已经存在,雪花合并也会添加数据

Redshift:当源表中有自动排序键时,创建表失败