将 SQL 连接查询转换为 pyspark 语法

Posted

技术标签:

【中文标题】将 SQL 连接查询转换为 pyspark 语法【英文标题】:convert SQL join query to pyspark syntax 【发布时间】:2018-03-31 15:34:18 【问题描述】:

我正在努力将已知的工作 SQL 查询转换为在 pyspark 中工作,给定两个数据帧,使用以下方法:.join.wherefilter 等。

以下是有效的 SQL 查询示例(仅选择 r.id,我通常会选择更多列):

# "invalid" records, where there is a matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;

# "valid" records, where there is no matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;

我已接近 80/20,但无法理解最后几个步骤和/或如何最有效地执行此操作。

我有一个 Dataframe r_dfid 我想加入 Dataframe rv_dfrecord_id。作为输出,我只想要 distinct r.id,并且只想要来自 r_df 的列,没有来自 rv_df 的列。最后,我想要两个不同的调用,其中 匹配(对我来说什么是“无效”记录),以及 不是 匹配(我考虑“有效”记录)。

我有接近的 pyspark 查询,但不太清楚如何确保 r_df.id 是不同的,并且仅从 r_df 中选择列,从 rv_df 中不选择列。

任何帮助将不胜感激!

【问题讨论】:

您问题中的两个查询在我看来都是一样的。将其转换为 DataFrame 函数将是:invalid_df = r_df.alias('r').join(rv_df.withColumn('record_id', f.col('id')).alias('rv'), on='id', how='left_outer').where('(r.job_id = 41) AND (rv.record_id is not null)').select('r.id').distinct()。基于docs for join列必须存在于两边,这就是我在rv_df 上创建id 列的原因。 【参考方案1】:

只需要离开几个小时。找到了适合我用例的解决方案。

首先,从rv_df 中只选择不同的record_id

rv_df = rv_df.select('record_id').distinct()

然后将其用于交叉和不相交:

# Intersection:
j_df = r_df.join(rv_df, r_df.id == rv_df.record_id, 'leftsemi').select(r_df['*'])

# Disjoint:
j_df = r_df.join(rv_df, r_df.id == rv_df.record_id, 'leftanti').select(r_df['*'])

【讨论】:

以上是关于将 SQL 连接查询转换为 pyspark 语法的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

如何将 sql 查询转换为 Pandas Dataframe 和 PySpark Dataframe

Pyspark 连接到 Microsoft SQL 服务器?

如何将带有内连接语句的 Sql 查询转换为带有 Where 语句的 sql 查询(语句中没有内连接)

将pyspark列连接到pyspark DataFrame

如何将此 SQL 内部联接查询转换为 LINQ 语法?