PySpark 或 SQL:消耗合并
Posted
技术标签:
【中文标题】PySpark 或 SQL:消耗合并【英文标题】:PySpark or SQL: consuming coalesce 【发布时间】:2020-08-18 19:59:29 【问题描述】:我正在尝试将多个输入列合并为 pyspark 数据框或 sql 表中的多个输出列。
每个输出列将包含“第一个可用”输入值,然后“消耗”它,因此输入值对于后续输出列不可用。
+----+-----+-----+-----+-----+-----+---+------+------+------+
| ID | in1 | in2 | in3 | in4 | in5 | / | out1 | out2 | out3 |
+----+-----+-----+-----+-----+-----+---+------+------+------+
| 1 | | | C | | | / | C | | |
| 2 | A | | C | | E | / | A | C | E |
| 3 | A | B | C | | | / | A | B | C |
| 4 | A | B | C | D | E | / | A | B | C |
| 5 | | | | | | / | | | |
| 6 | | B | | | E | / | B | E | |
| 7 | | B | | D | E | / | B | D | E |
+----+-----+-----+-----+-----+-----+---+------+------+------+
最好的方法是什么?
编辑:澄清 - in1、in2、in3 等。可以是任何值
【问题讨论】:
您可以采用与here类似的方法 【参考方案1】:这里是方法。
import pyspark.sql.functions as f
df = spark.read.option("header","true").option("inferSchema","true").csv("test.csv")
cols = df.columns
cols.remove('ID')
df2 = df.withColumn('ins', f.array_except(f.array(*cols), f.array(f.lit(None))))
for i in range(0, 3):
df2 = df2.withColumn('out' + str(i+1), f.col('ins')[i])
df2.show(10, False)
+---+----+----+----+----+----+---------------+----+----+----+
|ID |in1 |in2 |in3 |in4 |in5 |ins |out1|out2|out3|
+---+----+----+----+----+----+---------------+----+----+----+
|1 |null|null|C |null|null|[C] |C |null|null|
|2 |A |null|C |null|E |[A, C, E] |A |C |E |
|3 |A |B |C |null|null|[A, B, C] |A |B |C |
|4 |A |B |C |D |E |[A, B, C, D, E]|A |B |C |
|5 |null|null|null|null|null|[] |null|null|null|
|6 |null|B |null|null|E |[B, E] |B |E |null|
|7 |null|B |null|D |E |[B, D, E] |B |D |E |
+---+----+----+----+----+----+---------------+----+----+----+
【讨论】:
【参考方案2】:以下答案假设输入列内的value
包含二值布尔逻辑,这意味着in1
只能是'A',无,in2
只能是'B',无 等。
import pyspark.sql.functions as F
df = spark.createDataFrame([[1, None, None, 'C', 'D', None],
[2, 'A', None, 'C', None, 'E'],
[3, None , 'B', 'C', None, 'E'],
[4, None, None, 'C', None, 'E'],
[5, 'A', 'B', 'C', None, None]],
['ID', 'in1', 'in2', 'in3', 'in4', 'in5'])
df2 = df.withColumn('out1', F.when(F.col('in1').isNotNull(), F.col('in1'))
.when(F.col('in2').isNotNull(), F.col('in2'))
.when(F.col('in3').isNotNull(), F.col('in3'))
.when(F.col('in4').isNotNull(), F.col('in4'))
.when(F.col('in5').isNotNull(), F.col('in5'))
)\
.withColumn('out2', F.when(F.col('out1') < F.col('in2'), F.col('in2'))
.when(F.col('out1') < F.col('in3'), F.col('in3'))
.when(F.col('out1') < F.col('in4'), F.col('in4'))
.when(F.col('out1') < F.col('in5'), F.col('in5'))
)\
.withColumn('out3', F.when(F.col('out2') < F.col('in3'), F.col('in3'))
.when(F.col('out2') < F.col('in4'), F.col('in4'))
.when(F.col('out2') < F.col('in5'), F.col('in5'))
)
df2.show()
>>>
+---+----+----+---+----+----+----+----+----+
| ID| in1| in2|in3| in4| in5|out1|out2|out3|
+---+----+----+---+----+----+----+----+----+
| 1|null|null| C| D|null| C| D|null|
| 2| A|null| C|null| E| A| C| E|
| 3|null| B| C|null| E| B| C| E|
| 4|null|null| C|null| E| C| E|null|
| 5| A| B| C|null|null| A| B| C|
+---+----+----+---+----+----+----+----+----+
上面的答案利用了这样一个事实,即对于像F.when(F.col('out1') < F.col('in2'), F.col('in2'))
这样的表达式,如果out1
和in2
都不是Null,则条件仅解析为True,因此只有out2
才会在@987654329 时被填充@ 已填满。
【讨论】:
以上是关于PySpark 或 SQL:消耗合并的主要内容,如果未能解决你的问题,请参考以下文章
pyspark:如果列在不同行中具有相同的值,则合并两行或多行