优先加入 PySpark 数据帧

Posted

技术标签:

【中文标题】优先加入 PySpark 数据帧【英文标题】:Prioritized joining of PySpark dataframes 【发布时间】:2021-02-12 12:39:47 【问题描述】:

假设我有两个 PySpark 数据框。

df1
| A     | B              |
| ----- | -------------- |
| foo   | B1             |
| bar   | B2             |
| baz   | B3             |
| lol   | B9            |

df2
| X      | Y  | Z       |
| ------ | -- | --------|
| bar    | B1 | Cool    |
| foo    | B2 | Awesome |
| val    | B3 | Superb  |
| bar    | B4 | Nice    |

如何将这些数据框加入df3,以便我

    优先加入df1["A"]df2["X"],并从df2["Z"]获取值, 和 如果任何df3["Z"] 值是null,请填写null 值 将df1["B"]df2["Y"] 连接并从df2["Z"] 获取值的结果?

举例说明我想以df4 而不是df3 结束(注意df3 中的null 值):

df3
| A   | B  | Z       |
| --- | -- | ------- |
| foo | B1 | Awesome |
| bar | B2 | Cool    |
| bar | B4 | Nice    |
| baz | B3 | null    |
| lol | B9 | null    |

df4
| A   | B  | Z       |
| --- | -- | ------- |
| foo | B1 | Awesome |
| bar | B2 | Cool    |
| bar | B4 | Nice    |
| baz | B3 | Superb  |
| lol | B9 | null    |

我的非简化现实世界示例有很多重复项、很多列等,所以我看不出一个简单的 when/otherwise 语句是否足够(或者我可能完全迷失了......)。有什么建议吗?

【问题讨论】:

【参考方案1】:

你可以尝试做两个连接:

import pyspark.sql.functions as F

df4 = df1.join(
    df2,
    df1['A'] == df2['X'],
    'left'
).select(
    'A', 'B', 'Z'
).alias('df3').join(
    df2.alias('df2'),
    F.expr('df3.B = df2.Y and df3.Z is null'),
    'left'
).select(
    'A', 'B', F.coalesce('df3.z', 'df2.z').alias('z')
)

df4.show()
+---+---+-------+
|  A|  B|      z|
+---+---+-------+
|foo| B1|Awesome|
|bar| B2|   Nice|
|bar| B2|   Cool|
|baz| B3| Superb|
|lol| B9|   null|
+---+---+-------+

或者,如果您只喜欢 1 次加入,

df4 = df1.join(
    df2,
    (df1['A'] == df2['X']) | (df1['B'] == df2['Y']), 
    'left'
).selectExpr(
    '*',
    'max(A = X) over(partition by A, B) as flag'
).filter(
    '(flag and A = X) or not flag or flag is null'
).select(
    'A','B','Z'
)

df4.show()
+---+---+-------+
|  A|  B|      Z|
+---+---+-------+
|bar| B2|   Cool|
|bar| B2|   Nice|
|foo| B1|Awesome|
|lol| B9|   null|
|baz| B3| Superb|
+---+---+-------+

【讨论】:

以上是关于优先加入 PySpark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

根据匹配值加入两个 pyspark 数据帧(直到某个小数点)

加入两个数据框pyspark

使用复杂的条件逻辑加入 Pyspark 数据帧(可能使用地图代替)

Pyspark:加入 2 个数据帧以仅从第 2 个数据帧获取新记录(历史化)

为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

Pyspark - 如何将多个数据帧的列连接成一个数据帧的列