Pyspark 加入数据框

Posted

技术标签:

【中文标题】Pyspark 加入数据框【英文标题】:Pyspark joining dataframes 【发布时间】:2021-08-31 20:25:09 【问题描述】:

我想加入两个数据框,但我的目标是为第一个 DF 的每个 id 重复第二个 DF 中的所有行。

例如:

第一个 DF:

id user score
1 H234 3
2 H123 4

第二个DF:

id trait conditional_score
1 Blood Pressure 2
2 Stroke 4
3 Joint Pain 3

输出DF:

id user condition trait conditional_score
1 H234 3 Blood Pressure 3
1 H234 3 Stroke 7
1 H234 3 Joint Pain 6
2 H123 4 Blood Pressure 4
2 H123 4 Stroke 8
2 H123 4 Joint Pain 4

因此,平均的想法是将第二个 DataFrame 的所有数据添加到第一个数据帧,如果 condition 等于或小于 conditional_score,则将列 conditionconditional_score 相加。

我是 PySpark 的新手,不知道如何继续,有人可以帮我吗?

【问题讨论】:

【参考方案1】:

您可以使用交叉连接来实现此目的。

您应该确保将 spark.sql.crossJoin.enabled=true configuration property 设置为 true。

方法一:使用 Spark SQL

然后您可以通过使用 spark sql 来实现此目的

    为每个数据框创建临时视图
first_df.createOrReplaceTempView("first_df")
second_df.createOrReplaceTempView("second_df")
    在 Spark 会话中运行以下 sql
output_df = spark_session.sql("""
    SELECT
        first_df.id,
        first_df.user,
        first_df.score as condition,
        second_df.trait,
        CASE
            WHEN first_df.score <= second_df.conditional_score THEN first_df.score + second_df.conditional_score
            ELSE first_df.score 
        END as conditional_score
    FROM
        first_df
    CROSS JOIN
        second_df
""")

方法 2:使用 Pyspark API

您也可以使用 pyspark api 实现此目的

from pyspark.sql import functions as F

output_df = first_df.alias("first_df")\
                    .crossJoin(second_df.alias("second_df"))\
                    .select(
                        F.col('first_df.id'),
                        F.col('first_df.user'),
                        F.col('first_df.score').alias('condition'),
                        F.col('second_df.trait'),
                        F.when( 
 F.col('first_df.score') <= F.col('second_df.conditional_score'), F.col('first_df.score') +F.col('second_df.conditional_score')
).otherwise( F.col('first_df.score') ).alias("conditional_score")
                    )
        

让我知道这是否适合你。

【讨论】:

以上是关于Pyspark 加入数据框的主要内容,如果未能解决你的问题,请参考以下文章

加入两个数据框和结果数据框在 PySpark 中包含不重复的项目?

加入两个分区数据框pyspark

加入后替换pyspark数据框中的列

pyspark 将最小值添加回数据框

我有一个 pyspark 数据框,我必须在其中加入两列 orderby 升序并从中提取最高值

使用 pyspark 数据框中的复制名称加入后使用左表中的所有列进行分组