Pyspark 以多个列名为中心

Posted

技术标签:

【中文标题】Pyspark 以多个列名为中心【英文标题】:Pyspark pivot on multiple column names 【发布时间】:2020-08-06 13:58:13 【问题描述】:

我目前有一个数据框 df

id | c1   | c2   | c3 |
1  | diff | same | diff
2  | same | same | same
3  | diff | same | same
4  | same | same | same

我希望我的输出看起来像

name| diff | same
c1  |   2  | 2
c2  |   0  | 4
c3  |   1  | 3

当我尝试时:

df.groupby('c2').pivot('c2').count() -> transformation A

|f2   | diff | same |
|same | null |  2
|diff | 2    |  null

我假设我需要为每一列编写一个循环并通过转换 A 传递它? 但是我在正确转换 A 时遇到问题。 请帮忙

【问题讨论】:

你还有其他不同的值吗? 你现在可以假设没有 【参考方案1】:

Pivot 是一种昂贵的 shuffle 操作,如果可能应该避免。尝试将此逻辑与 arrays_zip and explode 一起使用以动态折叠列groupby-aggregate

from pyspark.sql import functions as F   

df.withColumn("cols", F.explode(F.arrays_zip(F.array([F.array(F.col(x),F.lit(x))\
                                                    for x in df.columns if x!='id']))))\
  .withColumn("name", F.col("cols.0")[1]).withColumn("val", F.col("cols.0")[0]).drop("cols")\
  .groupBy("name").agg(F.count(F.when(F.col("val")=='diff',1)).alias("diff"),\
                       F.count(F.when(F.col("val")=='same',1)).alias("same")).orderBy("name").show()

#+----+----+----+
#|name|diff|same|
#+----+----+----+
#|  c1|   2|   2|
#|  c2|   0|   4|
#|  c3|   1|   3|
#+----+----+----+

您也可以通过 exploding a map_type 创建一个 map dynamically 来执行此操作。

from pyspark.sql import functions as F
from itertools import chain

df.withColumn("cols", F.create_map(*(chain(*[(F.lit(name), F.col(name))\
                                  for name in df.columns if name!='id']))))\
  .select(F.explode("cols").alias("name","val"))\
  .groupBy("name").agg(F.count(F.when(F.col("val")=='diff',1)).alias("diff"),\
                       F.count(F.when(F.col("val")=='same',1)).alias("same")).orderBy("name").show()

#+----+----+----+
#|name|diff|same|
#+----+----+----+
#|  c1|   2|   2|
#|  c2|   0|   4|
#|  c3|   1|   3|
#+----+----+----+

【讨论】:

【参考方案2】:
from pyspark.sql.functions import *
df = spark.createDataFrame([(1,'diff','same','diff'),(2,'same','same','same'),(3,'diff','same','same'),(4,'same','same','same')],['idcol','C1','C2','C3'])
df.createOrReplaceTempView("MyTable")
#spark.sql("select * from MyTable").collect()
x1=spark.sql("select idcol, 'C1' AS col, C1 from MyTable union all select idcol, 'C2' AS col, C2 from MyTable  union all select idcol, 'C3' AS col, C3 from MyTable")
#display(x1)
x2=x1.groupBy('col').pivot('C1').agg(count('C1')).orderBy('col')
display(x2)

【讨论】:

您好,欢迎来到堆栈溢出。感谢您发布此答案。您是否有机会编辑答案以解释它的工作原理以及它与其他答案的不同之处?

以上是关于Pyspark 以多个列名为中心的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 读取多个 csv 文件

Pyspark:将多个数组列拆分为行

Pyspark:将多个数组列拆分为行

Pyspark 使用 udf 处理数组列并返回另一个数组

在 PySpark 中的多个列上应用 MinMaxScaler

PySpark - 使用 withColumnRenamed 重命名多个列