如何在pyspark中的聚合函数之后保持列的顺序一致
Posted
技术标签:
【中文标题】如何在pyspark中的聚合函数之后保持列的顺序一致【英文标题】:How to keep order of columns consistent after aggregation functions in pyspark 【发布时间】:2021-04-12 03:16:04 【问题描述】:我正在尝试创建一个包含多个传感器读数的聚合数据框,其中包含每个传感器的总和。我有很多数据框,但它们都有相同的架构,有 10 列,每个传感器一个:
+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+
|sensor_1|sensor_2|sensor_3|sensor_4|sensor_5|sensor_6|sensor_7|sensor_8|sensor_9|sensor_10|
+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+
| 220.0| 339.0| -336.0| 364.0| null| 492.0| -796.0| -423.0| -582.0| -40.0|
| 178.0| 221.0| -317.0| 366.0| null| 525.0| -754.0| -415.0| -932.0| -305.0|
| 151.0| 42.0| -280.0| 250.0| null| 463.0| -772.0| -229.0| -257.0| -59.0|
| 162.0| -123.0| -243.0| 288.0| null| 303.0| -899.0| 212.0| -295.0| 38.0|
| 158.0| -287.0| -300.0| 372.0| null| 169.0| -769.0| 755.0| 169.0| -239.0|
| 136.0| -302.0| -308.0| 242.0| null| 241.0| -510.0| 888.0| 282.0| -293.0|
| 124.0| -131.0| -292.0| 132.0| null| 234.0| -494.0| 970.0| -326.0| -203.0|
| 127.0| 133.0| -208.0| 14.0| null| 134.0| -748.0| 700.0| 237.0| -278.0|
| 142.0| 374.0| -81.0| -177.0| null| -200.0| -678.0| 402.0| 664.0| -460.0|
| 135.0| 538.0| 52.0| -113.0| null| -440.0| -711.0| 35.0| 877.0| -452.0|
+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+
可以尝试重新创建一个小版本的数据框
columns = ['sensor_1','sensor_2','sensor_3','sensor_4','sensor_5','sensor_6','sensor_7','sensor_8','sensor_9','sensor_10']
data = [(220.0, 339.0, -336.0, 364.0, null, 492.0, -796.0, -423.0, -582.0, -40.0),
(178.0, 221.0, -317.0, 366.0, null, 525.0, -754.0, -415.0, -932.0, -305.0),
(151.0, 42.0, -280.0, 250.0, null, 463.0, -772.0, -229.0, -257.0, -59.0)]
spark = SparkSession.builder.appName('Sensors').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF()
在我尝试创建 sums DataFrame 之后:
exprs = x: "sum" for x in df.columns
sum_df = df.agg(exprs)
这给出了以下输出。
+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|sum(sensor_2)|sum(sensor_9)|sum(sensor_3)|sum(sensor_8)|sum(sensor_4)|sum(sensor_7)|sum(sensor_1)|
+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
| 276834.0| 87904.0| 213587.0| 76103.0| 121201.0| 423609.0| -96621.0|
+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
如您所见,列与原始列不符。我不得不修剪整个 df 的显示以整齐地适合这篇文章,但你明白了。我不确定火花引擎决定使用什么逻辑顺序,但它不适合我,因为我需要它们具有一致的顺序。为什么这样做?如何保持顺序一致?
【问题讨论】:
我认为由于您使用 dict 创建 expr,因此会导致此重新排序。您可以使用 collections 中的 OrderedDict 来解决此问题。 您可以与 Pyspak DF 一起使用吗? 【参考方案1】:让我们尝试使用列表推导;
df1=df.agg(*[f.sum(x).alias(x) for x in df.columns])
df1.show()
【讨论】:
这给了我错误 -AttributeError: 'DataFrame' object has no attribute 'sum'
哪个是哪个,你似乎已经接受了答案。无论如何,agg可以应用于df。如果没有您的真实数据框的详细信息,我可以说出错误的原因。我已将其应用于 2m 行 df 并且有效。
我在收到此错误之前接受了它。你能解释一下“f”是什么吗?
import pyspark.sql.functions as f
谢谢你清理它。还想知道如何获取别名列名称,如 sum(sensor_1)。以上是关于如何在pyspark中的聚合函数之后保持列的顺序一致的主要内容,如果未能解决你的问题,请参考以下文章
如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?