如何在 pyspark 中使用“不存在”的 SQL 条件?

Posted

技术标签:

【中文标题】如何在 pyspark 中使用“不存在”的 SQL 条件?【英文标题】:How can I use "where not exists" SQL condition in pyspark? 【发布时间】:2019-02-06 09:21:06 【问题描述】:

我在 Hive 上有一个表,我正在尝试在该表中插入数据。我正在从 SQL 中获取数据,但我不想插入 Hive 表中已经存在的 id。我正在尝试使用与不存在相同的条件。我在 Airflow 上使用 PySpark

【问题讨论】:

【参考方案1】:

exists 运算符在 Spark 中不存在,但有 2 个 join operators 可以替换它:left_antileft_semi

例如,如果您想在 hive 表 target 中插入数据框 df,您可以这样做:

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id'
)

然后你在你的表中写new_df

left_anti 允许您只保留不满足连接条件的行(相当于not exists)。 exists 的等价物是 left_semi

【讨论】:

我第一次看到这个。不需要做 df2 = spark.table('table') 我假设? 你当然可以先将spark.table()赋值给一个变量df2,但是如果你只在一个地方使用数据框就没有必要了。【参考方案2】:

您可以通过临时视图在数据帧上直接使用 spark SQL 使用 not exist

table_withNull_df.createOrReplaceTempView("table_withNull")
tblA_NoNull_df.createOrReplaceTempView("tblA_NoNull")

result_df = spark.sql("""
select * from table_withNull 
where not exists 
(select 1 from 
tblA_NoNull 
where table_withNull.id = tblA_NoNull.id)
""")

这种方法可以优先于左反连接,因为它们会导致意外的 BroadcastNestedLoopJoin 导致广播超时(即使没有在反连接中明确请求广播)。

之后你可以write.mode("append")插入之前没有遇到的数据。

示例取自here

【讨论】:

【参考方案3】:

恕我直言,我认为 Spark 中不存在这样的属性。我认为您可以使用两种方法:

    UNIQUE 条件的解决方法(典型的关系数据库):通过这种方式,当您尝试插入(在append 模式下)一个已经存在的记录时,您将得到一个您可以正确处理的异常处理。

    读取你要写入的表,outer join它与你要添加到上述表中的数据,然后将结果写入overwrite mode(但我认为第一个解决方案可能性能更好)。

更多详情欢迎咨询

【讨论】:

以上是关于如何在 pyspark 中使用“不存在”的 SQL 条件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花中执行“不存在语句”

如何在pyspark sql中保存表?

如何在字典中使用 pyspark.sql.functions.when() 的多个条件?

如何在 pyspark.sql.functions.when() 中使用多个条件?

如何在 PySpark SQL when() 子句中使用聚合值?

pySpark.sql 如何使用 WHERE 关键字?