Pyspark 以多个列名为中心
Posted
技术标签:
【中文标题】Pyspark 以多个列名为中心【英文标题】:Pyspark pivot on multiple column names 【发布时间】:2020-08-06 13:58:13 【问题描述】:我目前有一个数据框 df
id | c1 | c2 | c3 |
1 | diff | same | diff
2 | same | same | same
3 | diff | same | same
4 | same | same | same
我希望我的输出看起来像
name| diff | same
c1 | 2 | 2
c2 | 0 | 4
c3 | 1 | 3
当我尝试时:
df.groupby('c2').pivot('c2').count() -> transformation A
|f2 | diff | same |
|same | null | 2
|diff | 2 | null
我假设我需要为每一列编写一个循环并通过转换 A 传递它? 但是我在正确转换 A 时遇到问题。 请帮忙
【问题讨论】:
你还有其他不同的值吗? 你现在可以假设没有 【参考方案1】:Pivot
是一种昂贵的 shuffle
操作,如果可能应该避免。尝试将此逻辑与 arrays_zip and explode
一起使用以动态折叠列 和 groupby-aggregate
。
from pyspark.sql import functions as F
df.withColumn("cols", F.explode(F.arrays_zip(F.array([F.array(F.col(x),F.lit(x))\
for x in df.columns if x!='id']))))\
.withColumn("name", F.col("cols.0")[1]).withColumn("val", F.col("cols.0")[0]).drop("cols")\
.groupBy("name").agg(F.count(F.when(F.col("val")=='diff',1)).alias("diff"),\
F.count(F.when(F.col("val")=='same',1)).alias("same")).orderBy("name").show()
#+----+----+----+
#|name|diff|same|
#+----+----+----+
#| c1| 2| 2|
#| c2| 0| 4|
#| c3| 1| 3|
#+----+----+----+
您也可以通过 exploding a map_type
创建一个 map dynamically
来执行此操作。
from pyspark.sql import functions as F
from itertools import chain
df.withColumn("cols", F.create_map(*(chain(*[(F.lit(name), F.col(name))\
for name in df.columns if name!='id']))))\
.select(F.explode("cols").alias("name","val"))\
.groupBy("name").agg(F.count(F.when(F.col("val")=='diff',1)).alias("diff"),\
F.count(F.when(F.col("val")=='same',1)).alias("same")).orderBy("name").show()
#+----+----+----+
#|name|diff|same|
#+----+----+----+
#| c1| 2| 2|
#| c2| 0| 4|
#| c3| 1| 3|
#+----+----+----+
【讨论】:
【参考方案2】:from pyspark.sql.functions import *
df = spark.createDataFrame([(1,'diff','same','diff'),(2,'same','same','same'),(3,'diff','same','same'),(4,'same','same','same')],['idcol','C1','C2','C3'])
df.createOrReplaceTempView("MyTable")
#spark.sql("select * from MyTable").collect()
x1=spark.sql("select idcol, 'C1' AS col, C1 from MyTable union all select idcol, 'C2' AS col, C2 from MyTable union all select idcol, 'C3' AS col, C3 from MyTable")
#display(x1)
x2=x1.groupBy('col').pivot('C1').agg(count('C1')).orderBy('col')
display(x2)
【讨论】:
您好,欢迎来到堆栈溢出。感谢您发布此答案。您是否有机会编辑答案以解释它的工作原理以及它与其他答案的不同之处?以上是关于Pyspark 以多个列名为中心的主要内容,如果未能解决你的问题,请参考以下文章