pyspark 将行转换为列

Posted

技术标签:

【中文标题】pyspark 将行转换为列【英文标题】:pyspark convert rows to columns 【发布时间】:2021-03-08 10:45:02 【问题描述】:

我有一个数据框,我需要将同一组的行转换为列。基本上以这些为中心。下面是我的df。

+------------+-------+-----+-------+
|Customer    |ID     |unit |order  |
+------------+-------+-----+-------+
|John        |123    |00015|1      |
|John        |123    |00016|2      |
|John        |345    |00205|3      |
|John        |345    |00206|4      |
|John        |789    |00283|5      |
|John        |789    |00284|6      |
+------------+-------+-----+-------+

我需要上述结果数据作为..

+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state   | ID_1  | unit_1 |seq_num_1 | ID_2   | unit_2 | seq_num_2 | ID_3   |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John    | 123   | 00015  | 1        |  345   | 00205  | 3         |  789   |00283  | 5        |
|John    | 123   | 00016  | 2        |  345   | 00206  | 4         |  789   |00284  | 6        |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+

我尝试了 groupBy 和 pivot() 函数,但它的抛出错误表明找到了大的枢轴值。有什么方法可以在不使用 pivot() 函数的情况下获得结果。非常感谢任何帮助。 谢谢。

【问题讨论】:

您可能在数据透视列中有超过 10000 个不同的值,这是默认的最大值 (spark.sql.pivotMaxValues) @blackbishop.yes。这是正确的。有什么方法可以在不使用 pivot() 函数的情况下实现结果。 你可以随时增加这个值spark.conf.set("spark.sql.pivotMaxValues", newMaxValue),但是pivot是一个资源密集型操作,你可能会遇到性能问题。 是的.. 这就是原因,正在寻找任何替代解决方案。赞赏 可能你可以用 pandas udf 做一个分组地图。似乎您会根据客户和 ID 进行分组,然后生成所需的数据框作为输出。 Spark“堆叠”它们并输出最终的 Spark 数据帧。您可能需要附上一些外部数据,但似乎应该可行。 【参考方案1】:

这看起来像是使用 dense_rank() 聚合函数来创建一个通用序列(在下面的代码中为dr)的典型案例,每个序列下都有不同的ID 客户组,然后在此序列上进行旋转。我们可以使用 row_number() 执行类似于order 列的操作,以便在 groupby 中使用它:

from pyspark.sql import Window, functions as F

# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

添加两个 Window Specs:w1 获得 dense_rank()ID 超过 Customerw2 获得 row_number() of order 在相同的 CustomerID 下。

w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')

根据以上两个 WinSpec 添加两个新列:dr(dense_rank) 和 sid(row_number)

df1 = df.select(
    "*", 
    F.dense_rank().over(w1).alias('dr'), 
    F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
|    John|123|00015|    1|  1|  1|
|    John|123|00016|    2|  1|  2|
|    John|345|00205|    3|  2|  1|
|    John|345|00206|    4|  2|  2|
|    John|789|00283|    5|  3|  1|
|    John|789|00284|    6|  3|  2|
|    John|789|00285|    7|  3|  3|
+--------+---+-----+-----+---+---+

找到max(dr),这样我们就可以预先定义以range(1,N+1)为中心的列表(这将提高pivot方法的效率)。

N = df1.agg(F.max('dr')).first()[0]

Groupby Customer, sid 并使用 dr 进行旋转,然后进行聚合:

df_new = df1.groupby('Customer','sid') \
    .pivot('dr', range(1,N+1)) \
    .agg(
        F.first('ID').alias('ID'),
        F.first('unit').alias('unit'),
        F.first('order').alias('order')
)

df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

根据需要重命名列名:

import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

【讨论】:

【参考方案2】:

以下是我的解决方案.. 进行排名,然后将结果展平。

df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

rankedDF = df.withColumn("rank", row_number().over(Window.partitionBy("customer").orderBy("order")))
w1 = Window.partitionBy("customer").orderBy("order")
groupedDF = rankedDF.select("customer", "rank", collect_list("ID").over(w1).alias("ID"), collect_list("unit").over(w1).alias("unit"), collect_list("order").over(w1).alias("seq_num")).groupBy("customer", "rank").agg(max("ID").alias("ID"), max("unit").alias("unit"), max("seq_num").alias("seq_num") )    
groupedColumns = [col("customer")]
pivotColumns = map(lambda i:map(lambda a:col(a)[i-1].alias(a + "_" + `i`), ["ID", "unit", "seq_num"]), [1,2,3])
flattenedCols = [item for sublist in pivotColumns for item in sublist]
finalDf=groupedDF.select(groupedColumns + flattenedCols)

【讨论】:

【参考方案3】:

可能有多种方法可以做到这一点,但 pandas udf 可以是其中一种。这是一个基于您的数据的玩具示例:

df = pd.DataFrame('Customer': ['John']*6, 
                   'ID': [123]*2 + [345]*2 + [789]*2, 
                   'unit': ['00015', '00016', '00205', '00206', '00283', '00284'], 
                   'order': range(1, 7))
sdf = spark.createDataFrame(df)

# Spark 2.4 syntax. Spark 3.0 is less verbose
return_types = 'state string, ID_1 int, unit_1 string, seq_num_1 int, ID_2int, unit_2 string, seq_num_2 int, ID_3 int, unit_3 string, seq_num_3 int'
@pandas_udf(returnType=return_types, functionType=PandasUDFType.GROUPED_MAP)
def convert_to_wide(pdf):
    groups = pdf.groupby('ID')
    out = pd.concat([group.set_index('Customer') for _, group in groups], axis=1).reset_index()
    out.columns = ['state', 'ID_1', 'unit_1', 'seq_num_1', 'ID_2', 'unit_2', 'seq_num_2', 'ID_3', 'unit_3', 'seq_num_3']
    return out

sdf.groupby('Customer').apply(convert_to_wide).show()

+-----+----+------+---------+----+------+---------+----+------+---------+
|state|ID_1|unit_1|seq_num_1|ID_2|unit_2|seq_num_2|ID_3|unit_3|seq_num_3|
+-----+----+------+---------+----+------+---------+----+------+---------+
| John| 123| 00015|        1| 345| 00205|        3| 789| 00283|        5|
| John| 123| 00016|        2| 345| 00206|        4| 789| 00284|        6|
+-----+----+------+---------+----+------+---------+----+------+---------+

【讨论】:

感谢您的帮助。不幸的是,我们的集群在 spark 2.3 上运行,并且由于此错误而失败..“ PyArrow >= 0.8.0 必须安装;但是,它没有找到。”。我无法安装它。有什么选择吗?谢谢。 @mathfish 不确定您是如何连接到集群的(例如客户端或集群),但我相信无论哪种情况,您都可以使用 --archives 传递一个压缩的 python 环境供驱动程序和工作人员使用选项。但我不想把你带进兔子洞。最糟糕的是,您可以直接转换表格。作为奖励,它是学习核心 pyspark 的好方法。

以上是关于pyspark 将行转换为列的主要内容,如果未能解决你的问题,请参考以下文章

将 RDD 转换为列联表:Pyspark

PySpark:如何将行转换为向量?

Pyspark 将行数据转换为键值对

如何将行转换为pyspark中的字典列表?

Pyspark(Dataframes)逐行读取文件(将行转换为字符串)

将行列表保存到 pyspark 中的 Hive 表