Pyspark 加入数据框
Posted
技术标签:
【中文标题】Pyspark 加入数据框【英文标题】:Pyspark joining dataframes 【发布时间】:2021-08-31 20:25:09 【问题描述】:我想加入两个数据框,但我的目标是为第一个 DF 的每个 id 重复第二个 DF 中的所有行。
例如:
第一个 DF:
id | user | score |
---|---|---|
1 | H234 | 3 |
2 | H123 | 4 |
第二个DF:
id | trait | conditional_score |
---|---|---|
1 | Blood Pressure | 2 |
2 | Stroke | 4 |
3 | Joint Pain | 3 |
输出DF:
id | user | condition | trait | conditional_score |
---|---|---|---|---|
1 | H234 | 3 | Blood Pressure | 3 |
1 | H234 | 3 | Stroke | 7 |
1 | H234 | 3 | Joint Pain | 6 |
2 | H123 | 4 | Blood Pressure | 4 |
2 | H123 | 4 | Stroke | 8 |
2 | H123 | 4 | Joint Pain | 4 |
因此,平均的想法是将第二个 DataFrame 的所有数据添加到第一个数据帧,如果 condition
等于或小于 conditional_score
,则将列 condition
和 conditional_score
相加。
我是 PySpark 的新手,不知道如何继续,有人可以帮我吗?
【问题讨论】:
【参考方案1】:您可以使用交叉连接来实现此目的。
您应该确保将 spark.sql.crossJoin.enabled=true
configuration property 设置为 true。
方法一:使用 Spark SQL
然后您可以通过使用 spark sql 来实现此目的
-
为每个数据框创建临时视图
first_df.createOrReplaceTempView("first_df")
second_df.createOrReplaceTempView("second_df")
-
在 Spark 会话中运行以下 sql
output_df = spark_session.sql("""
SELECT
first_df.id,
first_df.user,
first_df.score as condition,
second_df.trait,
CASE
WHEN first_df.score <= second_df.conditional_score THEN first_df.score + second_df.conditional_score
ELSE first_df.score
END as conditional_score
FROM
first_df
CROSS JOIN
second_df
""")
方法 2:使用 Pyspark API
您也可以使用 pyspark api 实现此目的
from pyspark.sql import functions as F
output_df = first_df.alias("first_df")\
.crossJoin(second_df.alias("second_df"))\
.select(
F.col('first_df.id'),
F.col('first_df.user'),
F.col('first_df.score').alias('condition'),
F.col('second_df.trait'),
F.when(
F.col('first_df.score') <= F.col('second_df.conditional_score'), F.col('first_df.score') +F.col('second_df.conditional_score')
).otherwise( F.col('first_df.score') ).alias("conditional_score")
)
让我知道这是否适合你。
【讨论】:
以上是关于Pyspark 加入数据框的主要内容,如果未能解决你的问题,请参考以下文章
加入两个数据框和结果数据框在 PySpark 中包含不重复的项目?