如何在pyspark中的聚合函数之后保持列的顺序一致

Posted

技术标签:

【中文标题】如何在pyspark中的聚合函数之后保持列的顺序一致【英文标题】:How to keep order of columns consistent after aggregation functions in pyspark 【发布时间】:2021-04-12 03:16:04 【问题描述】:

我正在尝试创建一个包含多个传感器读数的聚合数据框,其中包含每个传感器的总和。我有很多数据框,但它们都有相同的架构,有 10 列,每个传感器一个:

+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+ 
|sensor_1|sensor_2|sensor_3|sensor_4|sensor_5|sensor_6|sensor_7|sensor_8|sensor_9|sensor_10|
+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+ 
|   220.0|   339.0|  -336.0|   364.0|    null|   492.0|  -796.0|  -423.0|  -582.0|    -40.0| 
|   178.0|   221.0|  -317.0|   366.0|    null|   525.0|  -754.0|  -415.0|  -932.0|   -305.0| 
|   151.0|    42.0|  -280.0|   250.0|    null|   463.0|  -772.0|  -229.0|  -257.0|    -59.0| 
|   162.0|  -123.0|  -243.0|   288.0|    null|   303.0|  -899.0|   212.0|  -295.0|     38.0| 
|   158.0|  -287.0|  -300.0|   372.0|    null|   169.0|  -769.0|   755.0|   169.0|   -239.0| 
|   136.0|  -302.0|  -308.0|   242.0|    null|   241.0|  -510.0|   888.0|   282.0|   -293.0| 
|   124.0|  -131.0|  -292.0|   132.0|    null|   234.0|  -494.0|   970.0|  -326.0|   -203.0| 
|   127.0|   133.0|  -208.0|    14.0|    null|   134.0|  -748.0|   700.0|   237.0|   -278.0| 
|   142.0|   374.0|   -81.0|  -177.0|    null|  -200.0|  -678.0|   402.0|   664.0|   -460.0| 
|   135.0|   538.0|    52.0|  -113.0|    null|  -440.0|  -711.0|    35.0|   877.0|   -452.0|
+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+

可以尝试重新创建一个小版本的数据框

columns = ['sensor_1','sensor_2','sensor_3','sensor_4','sensor_5','sensor_6','sensor_7','sensor_8','sensor_9','sensor_10']
data = [(220.0,   339.0,  -336.0,   364.0,    null,   492.0,  -796.0,  -423.0,  -582.0,    -40.0),
       (178.0,   221.0,  -317.0,   366.0,    null,   525.0,  -754.0,  -415.0,  -932.0,   -305.0),
       (151.0,    42.0,  -280.0,   250.0,    null,   463.0,  -772.0,  -229.0,  -257.0,    -59.0)]

spark = SparkSession.builder.appName('Sensors').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF()

在我尝试创建 sums DataFrame 之后:

exprs = x: "sum" for x in df.columns
sum_df = df.agg(exprs)

这给出了以下输出。

+-------------+-------------+-------------+-------------+-------------+-------------+-------------+ 
|sum(sensor_2)|sum(sensor_9)|sum(sensor_3)|sum(sensor_8)|sum(sensor_4)|sum(sensor_7)|sum(sensor_1)|
+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|     276834.0|      87904.0|     213587.0|      76103.0|     121201.0|     423609.0|     -96621.0|         
+-------------+-------------+-------------+-------------+-------------+-------------+-------------+

如您所见,列与原始列不符。我不得不修剪整个 df 的显示以整齐地适合这篇文章,但你明白了。我不确定火花引擎决定使用什么逻辑顺序,但它不适合我,因为我需要它们具有一致的顺序。为什么这样做?如何保持顺序一致?

【问题讨论】:

我认为由于您使用 dict 创建 expr,因此会导致此重新排序。您可以使用 collections 中的 OrderedDict 来解决此问题。 您可以与 Pyspak DF 一起使用吗? 【参考方案1】:

让我们尝试使用列表推导;

df1=df.agg(*[f.sum(x).alias(x) for x in df.columns])
df1.show()

【讨论】:

这给了我错误 - AttributeError: 'DataFrame' object has no attribute 'sum' 哪个是哪个,你似乎已经接受了答案。无论如何,agg可以应用于df。如果没有您的真实数据框的详细信息,我可以说出错误的原因。我已将其应用于 2m 行 df 并且有效。 我在收到此错误之前接受了它。你能解释一下“f”是什么吗? import pyspark.sql.functions as f 谢谢你清理它。还想知道如何获取别名列名称,如 sum(sensor_1)。

以上是关于如何在pyspark中的聚合函数之后保持列的顺序一致的主要内容,如果未能解决你的问题,请参考以下文章

如何更改pyspark数据框中列的顺序?

Pyspark 将列列表转换为聚合函数

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

Pyspark - 一次聚合数据框的所有列[重复]

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

计算 PySpark DataFrame 列的模式?