PySpark 根据名称将列表分解为多列

Posted

技术标签:

【中文标题】PySpark 根据名称将列表分解为多列【英文标题】:PySpark explode list into multiple columns based on name 【发布时间】:2017-11-29 11:45:36 【问题描述】:

您好,我正在处理一种稍微困难的文件格式,我正在尝试清理它以备将来处理。我一直在使用 Pyspark 将数据处理成数据框。

文件看起来类似于:

AA 1234  ZXYW
BB A 890
CC B 321
AA 1234  LMNO
BB D 123
CC E 321
AA 1234  ZXYW
CC E 456

每条“AA”记录定义一个或多个逻辑组的开始,每行的数据是固定长度的,并且其中包含我要提取的编码信息。至少有 20-30 种不同的记录类型。它们总是在每行的开头用两个字母代码标识。每个组中可以有 1 种或多种不同的记录类型(即,并非每个组都存在所有记录类型)

作为第一阶段,我设法将记录按以下格式分组:

+----------------+---------------------------------+
|           index|                           result|
+----------------+---------------------------------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|
|               3|[AA 1234  ZXYV,CC B 321]         |
+----------------+---------------------------------+

作为第二阶段,我真的想将数据放入数据框中的以下列:

+----------------+---------------------------------+-------------+--------+--------+
|           index|                           result|           AA|      BB|      CC|
+----------------+---------------------------------+-------------+--------+--------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|AA 1234  ZXYV|BB A 890|CC B 321|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|AA 1234  LMNO|BB D 123|CC E 321|
|               3|[AA 1234  ZXYV,CC B 321]         |AA 1234  ZXYV|    Null|CC B 321|
+----------------+---------------------------------+-------------+--------+--------+

因为那时提取我需要的信息应该是微不足道的。

有人对我如何能够做到这一点有任何建议吗?

非常感谢。

【问题讨论】:

【参考方案1】:

在不转换为 rdd 的情况下分解数组的另一种方法,

from pyspark.sql import functions as F

udf1 = F.udf(lambda x : x.split()[0])
df.select('index',F.explode('result').alias('id'),udf1(F.col('id')).alias('idtype')).show()

+-----+-------------+------+
|index|           id|idtype|
+-----+-------------+------+
|    1|AA 1234  ZXYV|    AA|
|    1|     BB A 890|    BB|
|    1|     CC B 321|    CC|
|    2|AA 1234  LMNO|    AA|
|    2|     BB D 123|    BB|
|    2|     CC E 321|    CC|
|    3|AA 1234  ZXYV|    AA|
|    3|     CC B 321|    CC|
+-----+-------------+------+ 

df1.groupby('index').pivot('idtype').agg(F.first('id')).join(df,'index').show()

【讨论】:

【参考方案2】:

您可以使用flatMappivot 来实现此目的。从第一阶段的结果开始:

rdd = sc.parallelize([(1,['AA 1234  ZXYV','BB A 890','CC B 321']),
                      (2,['AA 1234  LMNO','BB D 123','CC E 321']),
                      (3,['AA 1234  ZXYV','CC B 321'])])

df = rdd.toDF(['index', 'result'])

您可以先使用flatMap 将数组分解为多行,然后将两个字母标识符提取到单独的列中。

df_flattened = df.rdd.flatMap(lambda x: [(x[0],y, y[0:2],y[3::]) for y in x[1]])\
               .toDF(['index','result', 'identifier','identifiertype'])

并使用pivot将两个字母标识符更改为列名:

df_result = df_flattened.groupby(df_flattened.index,)\
                        .pivot("identifier")\
                        .agg(first("identifiertype"))\
                        .join(df,'index')

我添加了连接以返回 result

【讨论】:

这绝对完美,正是我所需要的。非常感谢您的帮助。【参考方案3】:

假设您使用的是 Spark 2.x,我认为您正在寻找的是 spark 数据帧上的枢轴操作。

首先,您可以创建一个只有 2 列、2 个字母编码和另一列中的其余内容的表。然后您可以在数据帧上使用 pivot 来执行此操作,如下所示。

df.pivot("encoding_col",Seq("AA","BB"))

您可以找到一些使用数据框进行透视的好示例here

【讨论】:

以上是关于PySpark 根据名称将列表分解为多列的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 将 JSON 字符串分解为多列

Pyspark 将数组列分解为带有滑动窗口的子列表

将列表的列拆分为同一 PySpark 数据框中的多列

如何将 map_keys() 中的值拆分为 PySpark 中的多列

需要根据表中的唯一值将一列分解为多列?

pyspark将单列转换为多列[重复]