PySpark 或 SQL:消耗合并

Posted

技术标签:

【中文标题】PySpark 或 SQL:消耗合并【英文标题】:PySpark or SQL: consuming coalesce 【发布时间】:2020-08-18 19:59:29 【问题描述】:

我正在尝试将多个输入列合并为 pyspark 数据框或 sql 表中的多个输出列。

每个输出列将包含“第一个可用”输入值,然后“消耗”它,因此输入值对于后续输出列不可用。

+----+-----+-----+-----+-----+-----+---+------+------+------+
| ID | in1 | in2 | in3 | in4 | in5 | / | out1 | out2 | out3 |
+----+-----+-----+-----+-----+-----+---+------+------+------+
|  1 |     |     | C   |     |     | / | C    |      |      |
|  2 | A   |     | C   |     | E   | / | A    | C    | E    |
|  3 | A   | B   | C   |     |     | / | A    | B    | C    |
|  4 | A   | B   | C   | D   | E   | / | A    | B    | C    |
|  5 |     |     |     |     |     | / |      |      |      |
|  6 |     | B   |     |     | E   | / | B    | E    |      |
|  7 |     | B   |     | D   | E   | / | B    | D    | E    |
+----+-----+-----+-----+-----+-----+---+------+------+------+

最好的方法是什么?

编辑:澄清 - in1、in2、in3 等。可以是任何值

【问题讨论】:

您可以采用与here类似的方法 【参考方案1】:

这里是方法。

import pyspark.sql.functions as f

df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")

cols = df.columns
cols.remove('ID')

df2 = df.withColumn('ins', f.array_except(f.array(*cols), f.array(f.lit(None))))

for i in range(0, 3):
    df2 = df2.withColumn('out' + str(i+1), f.col('ins')[i])
    
df2.show(10, False)

+---+----+----+----+----+----+---------------+----+----+----+
|ID |in1 |in2 |in3 |in4 |in5 |ins            |out1|out2|out3|
+---+----+----+----+----+----+---------------+----+----+----+
|1  |null|null|C   |null|null|[C]            |C   |null|null|
|2  |A   |null|C   |null|E   |[A, C, E]      |A   |C   |E   |
|3  |A   |B   |C   |null|null|[A, B, C]      |A   |B   |C   |
|4  |A   |B   |C   |D   |E   |[A, B, C, D, E]|A   |B   |C   |
|5  |null|null|null|null|null|[]             |null|null|null|
|6  |null|B   |null|null|E   |[B, E]         |B   |E   |null|
|7  |null|B   |null|D   |E   |[B, D, E]      |B   |D   |E   |
+---+----+----+----+----+----+---------------+----+----+----+

【讨论】:

【参考方案2】:

以下答案假设输入列内的value 包含二值布尔逻辑,这意味着in1 只能是'A',无,in2 只能是'B',无 等。

import pyspark.sql.functions as F

df = spark.createDataFrame([[1, None, None, 'C', 'D', None],
                            [2, 'A', None, 'C', None, 'E'],
                            [3, None , 'B', 'C', None, 'E'],
                            [4, None, None, 'C', None, 'E'],
                            [5, 'A', 'B', 'C', None, None]],
                           ['ID', 'in1', 'in2', 'in3', 'in4', 'in5'])

df2 = df.withColumn('out1', F.when(F.col('in1').isNotNull(), F.col('in1'))
                             .when(F.col('in2').isNotNull(), F.col('in2'))
                             .when(F.col('in3').isNotNull(), F.col('in3'))
                             .when(F.col('in4').isNotNull(), F.col('in4'))
                             .when(F.col('in5').isNotNull(), F.col('in5'))
                   )\
        .withColumn('out2', F.when(F.col('out1') < F.col('in2'), F.col('in2'))
                             .when(F.col('out1') < F.col('in3'), F.col('in3'))
                             .when(F.col('out1') < F.col('in4'), F.col('in4'))
                             .when(F.col('out1') < F.col('in5'), F.col('in5'))
                   )\
        .withColumn('out3', F.when(F.col('out2') < F.col('in3'), F.col('in3'))
                             .when(F.col('out2') < F.col('in4'), F.col('in4'))
                             .when(F.col('out2') < F.col('in5'), F.col('in5'))
                   )

df2.show()
>>>
+---+----+----+---+----+----+----+----+----+
| ID| in1| in2|in3| in4| in5|out1|out2|out3|
+---+----+----+---+----+----+----+----+----+
|  1|null|null|  C|   D|null|   C|   D|null|
|  2|   A|null|  C|null|   E|   A|   C|   E|
|  3|null|   B|  C|null|   E|   B|   C|   E|
|  4|null|null|  C|null|   E|   C|   E|null|
|  5|   A|   B|  C|null|null|   A|   B|   C|
+---+----+----+---+----+----+----+----+----+

上面的答案利用了这样一个事实,即对于像F.when(F.col('out1') &lt; F.col('in2'), F.col('in2')) 这样的表达式,如果out1in2 都不是Null,则条件仅解析为True,因此只有out2 才会在@987654329 时被填充@ 已填满。

【讨论】:

以上是关于PySpark 或 SQL:消耗合并的主要内容,如果未能解决你的问题,请参考以下文章

PySpark SQL:合并 .withColumn 调用

pyspark列合并为一行

pyspark:如果列在不同行中具有相同的值,则合并两行或多行

Pyspark:将 pyspark.sql.row 转换为 Dataframe

如何将共享 id 的多行合并为一行(PYSPARK)

pyspark - 将两个数据帧与目标中的额外列合并