如何在 pyspark 中使用“不存在”的 SQL 条件?
Posted
技术标签:
【中文标题】如何在 pyspark 中使用“不存在”的 SQL 条件?【英文标题】:How can I use "where not exists" SQL condition in pyspark? 【发布时间】:2019-02-06 09:21:06 【问题描述】:我在 Hive 上有一个表,我正在尝试在该表中插入数据。我正在从 SQL 中获取数据,但我不想插入 Hive 表中已经存在的 id。我正在尝试使用与不存在相同的条件。我在 Airflow 上使用 PySpark。
【问题讨论】:
【参考方案1】:exists
运算符在 Spark 中不存在,但有 2 个 join operators 可以替换它:left_anti
和 left_semi
。
例如,如果您想在 hive 表 target
中插入数据框 df
,您可以这样做:
new_df = df.join(
spark.table("target"),
how='left_anti',
on='id'
)
然后你在你的表中写new_df
。
left_anti
允许您只保留不满足连接条件的行(相当于not exists
)。 exists
的等价物是 left_semi
。
【讨论】:
我第一次看到这个。不需要做 df2 = spark.table('table') 我假设? 你当然可以先将spark.table()
赋值给一个变量df2,但是如果你只在一个地方使用数据框就没有必要了。【参考方案2】:
您可以通过临时视图在数据帧上直接使用 spark SQL 使用 not exist
:
table_withNull_df.createOrReplaceTempView("table_withNull")
tblA_NoNull_df.createOrReplaceTempView("tblA_NoNull")
result_df = spark.sql("""
select * from table_withNull
where not exists
(select 1 from
tblA_NoNull
where table_withNull.id = tblA_NoNull.id)
""")
这种方法可以优先于左反连接,因为它们会导致意外的 BroadcastNestedLoopJoin 导致广播超时(即使没有在反连接中明确请求广播)。
之后你可以write.mode("append")
插入之前没有遇到的数据。
示例取自here
【讨论】:
【参考方案3】:恕我直言,我认为 Spark 中不存在这样的属性。我认为您可以使用两种方法:
UNIQUE
条件的解决方法(典型的关系数据库):通过这种方式,当您尝试插入(在append
模式下)一个已经存在的记录时,您将得到一个您可以正确处理的异常处理。
读取你要写入的表,outer join
它与你要添加到上述表中的数据,然后将结果写入overwrite mode
(但我认为第一个解决方案可能性能更好)。
更多详情欢迎咨询
【讨论】:
以上是关于如何在 pyspark 中使用“不存在”的 SQL 条件?的主要内容,如果未能解决你的问题,请参考以下文章
如何在字典中使用 pyspark.sql.functions.when() 的多个条件?
如何在 pyspark.sql.functions.when() 中使用多个条件?