将 SQL 连接查询转换为 pyspark 语法
Posted
技术标签:
【中文标题】将 SQL 连接查询转换为 pyspark 语法【英文标题】:convert SQL join query to pyspark syntax 【发布时间】:2018-03-31 15:34:18 【问题描述】:我正在努力将已知的工作 SQL 查询转换为在 pyspark 中工作,给定两个数据帧,使用以下方法:.join
、.where
、filter
等。
以下是有效的 SQL 查询示例(仅选择 r.id
,我通常会选择更多列):
# "invalid" records, where there is a matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;
# "valid" records, where there is no matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;
我已接近 80/20,但无法理解最后几个步骤和/或如何最有效地执行此操作。
我有一个 Dataframe r_df
列 id
我想加入 Dataframe rv_df
列 record_id
。作为输出,我只想要 distinct r.id
,并且只想要来自 r_df
的列,没有来自 rv_df
的列。最后,我想要两个不同的调用,其中 匹配(对我来说什么是“无效”记录),以及 不是 匹配(我考虑“有效”记录)。
我有接近的 pyspark 查询,但不太清楚如何确保 r_df.id
是不同的,并且仅从 r_df
中选择列,从 rv_df
中不选择列。
任何帮助将不胜感激!
【问题讨论】:
您问题中的两个查询在我看来都是一样的。将其转换为 DataFrame 函数将是:invalid_df = r_df.alias('r').join(rv_df.withColumn('record_id', f.col('id')).alias('rv'), on='id', how='left_outer').where('(r.job_id = 41) AND (rv.record_id is not null)').select('r.id').distinct()
。基于docs for join
:列必须存在于两边,这就是我在rv_df
上创建id
列的原因。
【参考方案1】:
只需要离开几个小时。找到了适合我用例的解决方案。
首先,从rv_df
中只选择不同的record_id
:
rv_df = rv_df.select('record_id').distinct()
然后将其用于交叉和不相交:
# Intersection:
j_df = r_df.join(rv_df, r_df.id == rv_df.record_id, 'leftsemi').select(r_df['*'])
# Disjoint:
j_df = r_df.join(rv_df, r_df.id == rv_df.record_id, 'leftanti').select(r_df['*'])
【讨论】:
以上是关于将 SQL 连接查询转换为 pyspark 语法的主要内容,如果未能解决你的问题,请参考以下文章
PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?
如何将 sql 查询转换为 Pandas Dataframe 和 PySpark Dataframe
Pyspark 连接到 Microsoft SQL 服务器?