如何按行将函数应用于 PySpark 数据帧的一组列?

Posted

技术标签:

【中文标题】如何按行将函数应用于 PySpark 数据帧的一组列?【英文标题】:How to apply a function to a set of columns of a PySpark dataframe by rows? 【发布时间】:2020-03-13 17:35:02 【问题描述】:

给定一个如下的数据框:

   A0  A1  A2  A3
0   9   1   2   8
1   9   7   6   9
2   1   7   4   6
3   0   8   4   8
4   0   1   6   0
5   7   1   4   3
6   6   3   5   9
7   3   3   2   8
8   6   3   0   8
9   3   2   7   1

我需要将一个函数逐行应用于一组列,以使用该函数的结果创建一个新列。

Pandas 中的一个例子是:

df = pd.DataFrame(data=None, columns=['A0', 'A1', 'A2', 'A3'])
df['A0'] = np.random.randint(0, 10, 10)
df['A1'] = np.random.randint(0, 10, 10)
df['A2'] = np.random.randint(0, 10, 10)
df['A3'] = np.random.randint(0, 10, 10)

df['mean'] = df.mean(axis=1)
df['std'] = df.iloc[:, :-1].std(axis=1)
df['any'] = df.iloc[:, :-2].apply(lambda x: np.sum(x), axis=1)

结果是:

   A0  A1  A2  A3  mean       std  any
0   9   1   2   8  5.00  4.082483   20
1   9   7   6   9  7.75  1.500000   31
2   1   7   4   6  4.50  2.645751   18
3   0   8   4   8  5.00  3.829708   20
4   0   1   6   0  1.75  2.872281    7
5   7   1   4   3  3.75  2.500000   15
6   6   3   5   9  5.75  2.500000   23
7   3   3   2   8  4.00  2.708013   16
8   6   3   0   8  4.25  3.500000   17
9   3   2   7   1  3.25  2.629956   13

如何在 PySpark 中做类似的事情?

【问题讨论】:

使用udf spark.apache.org/docs/latest/api/python/…,例如:s_std = udf(lambda x: float(np.std(x)),'float') 然后df.withColumn('std', s_std(array(*df.columns))).show() 看看这个question,它是关于总和的,如果你去看看pyspark.sql,你可能会找到你想做的另一个 【参考方案1】:

对于 Spark 2.4+,您可以使用 aggregate 函数。首先,使用所有数据框列创建数组列values。然后,像这样计算stdmeansany 列:

any:聚合以求和数组元素 mean:将any 列除以数组大小values std:聚合和求和(x - mean) ** 2,然后除以数组的length - 1

以下是相关代码:

from pyspark.sql.functions import expr, sqrt, size, col, array

data = [
    (9, 1, 2, 8), (9, 7, 6, 9), (1, 7, 4, 6),
    (0, 8, 4, 8), (0, 1, 6, 0), (7, 1, 4, 3),
    (6, 3, 5, 9), (3, 3, 2, 8), (6, 3, 0, 8),
    (3, 2, 7, 1)
]
df = spark.createDataFrame(data, ['A0', 'A1', 'A2', 'A3'])

cols = df.columns

df.withColumn("values", array(*cols)) \
  .withColumn("any", expr("aggregate(values, 0D, (acc, x) -> acc + x)")) \
  .withColumn("mean", col("any") / size(col("values"))) \
  .withColumn("std", sqrt(expr("""aggregate(values, 0D, 
                                           (acc, x) -> acc + power(x - mean, 2), 
                                           acc -> acc / (size(values) -1))"""
                              )
                         )) \
  .drop("values") \
  .show(truncate=False)

#+---+---+---+---+----+----+------------------+
#|A0 |A1 |A2 |A3 |any |mean|std               |
#+---+---+---+---+----+----+------------------+
#|9  |1  |2  |8  |20.0|5.0 |4.08248290463863  |
#|9  |7  |6  |9  |31.0|7.75|1.5               |
#|1  |7  |4  |6  |18.0|4.5 |2.6457513110645907|
#|0  |8  |4  |8  |20.0|5.0 |3.8297084310253524|
#|0  |1  |6  |0  |7.0 |1.75|2.8722813232690143|
#|7  |1  |4  |3  |15.0|3.75|2.5               |
#|6  |3  |5  |9  |23.0|5.75|2.5               |
#|3  |3  |2  |8  |16.0|4.0 |2.70801280154532  |
#|6  |3  |0  |8  |17.0|4.25|3.5               |
#|3  |2  |7  |1  |13.0|3.25|2.6299556396765835|
#+---+---+---+---+----+----+------------------+

火花:

您可以使用functools.reduceoperator.add 对列求和。逻辑同上:

from functools import reduce
from operator import add

df.withColumn("any", reduce(add, [col(c) for c in cols])) \
  .withColumn("mean", col("any") / len(cols)) \
  .withColumn("std", sqrt(reduce(add, [(col(c) - col("mean")) ** 2 for c in cols]) / (len(cols) -1)))\
  .show(truncate=False)

【讨论】:

spark 版本是 2.3.1,“any”列应该表示任何操作,而不是具体的值总和 aggregate()中的0D是什么意思 @Dee 是聚合函数的零值,D 指的是double类型。【参考方案2】:

上面的答案很好,但是我看到 OP 使用的是 Python/PySpark,如果你不理解 Spark SQL 表达式,上面的逻辑不是 100% 清楚的。

我建议使用 Pandas UDAF,与 UDF 不同,这些是矢量化的并且非常高效。这已添加到 Spark API 以降低从 pandas 迁移到 Spark 所需的学习曲线。这也意味着,如果您的大多数同事(例如我的同事)更熟悉 Pandas/Python,那么您的代码更易于维护。

这些是可用的 Pandas UDAF 类型及其对应的 Pandas

例如

SparkUdafType → df.pandasEquivalent(...) works on → returns

SCALAR → df.transform(...), Mapping Series → Series

GROUPED_MAP → df.apply(...) , Group & MapDataFrame → DataFrame 

GROUPED_AGG → df.aggregate(...), Reduce Series → Scalar

【讨论】:

以上是关于如何按行将函数应用于 PySpark 数据帧的一组列?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:如何获取 spark 数据帧的 Spark SQLContext?

熊猫数据帧的 PySpark rdd

Pyspark - 如何将多个数据帧的列连接成一个数据帧的列

根据时间频率将特定函数应用于数据帧的某个子集

应用模型后Pyspark提取转换数据帧的概率[重复]

如何根据来自其他 pyspark 数据帧的日期值过滤第二个 pyspark 数据帧?