如何按行将函数应用于 PySpark 数据帧的一组列?
Posted
技术标签:
【中文标题】如何按行将函数应用于 PySpark 数据帧的一组列?【英文标题】:How to apply a function to a set of columns of a PySpark dataframe by rows? 【发布时间】:2020-03-13 17:35:02 【问题描述】:给定一个如下的数据框:
A0 A1 A2 A3
0 9 1 2 8
1 9 7 6 9
2 1 7 4 6
3 0 8 4 8
4 0 1 6 0
5 7 1 4 3
6 6 3 5 9
7 3 3 2 8
8 6 3 0 8
9 3 2 7 1
我需要将一个函数逐行应用于一组列,以使用该函数的结果创建一个新列。
Pandas 中的一个例子是:
df = pd.DataFrame(data=None, columns=['A0', 'A1', 'A2', 'A3'])
df['A0'] = np.random.randint(0, 10, 10)
df['A1'] = np.random.randint(0, 10, 10)
df['A2'] = np.random.randint(0, 10, 10)
df['A3'] = np.random.randint(0, 10, 10)
df['mean'] = df.mean(axis=1)
df['std'] = df.iloc[:, :-1].std(axis=1)
df['any'] = df.iloc[:, :-2].apply(lambda x: np.sum(x), axis=1)
结果是:
A0 A1 A2 A3 mean std any
0 9 1 2 8 5.00 4.082483 20
1 9 7 6 9 7.75 1.500000 31
2 1 7 4 6 4.50 2.645751 18
3 0 8 4 8 5.00 3.829708 20
4 0 1 6 0 1.75 2.872281 7
5 7 1 4 3 3.75 2.500000 15
6 6 3 5 9 5.75 2.500000 23
7 3 3 2 8 4.00 2.708013 16
8 6 3 0 8 4.25 3.500000 17
9 3 2 7 1 3.25 2.629956 13
如何在 PySpark 中做类似的事情?
【问题讨论】:
使用udf spark.apache.org/docs/latest/api/python/…,例如:s_std = udf(lambda x: float(np.std(x)),'float')
然后df.withColumn('std', s_std(array(*df.columns))).show()
看看这个question,它是关于总和的,如果你去看看pyspark.sql,你可能会找到你想做的另一个
【参考方案1】:
对于 Spark 2.4+,您可以使用 aggregate
函数。首先,使用所有数据框列创建数组列values
。然后,像这样计算std
、means
和any
列:
any
:聚合以求和数组元素
mean
:将any
列除以数组大小values
std
:聚合和求和(x - mean) ** 2
,然后除以数组的length - 1
以下是相关代码:
from pyspark.sql.functions import expr, sqrt, size, col, array
data = [
(9, 1, 2, 8), (9, 7, 6, 9), (1, 7, 4, 6),
(0, 8, 4, 8), (0, 1, 6, 0), (7, 1, 4, 3),
(6, 3, 5, 9), (3, 3, 2, 8), (6, 3, 0, 8),
(3, 2, 7, 1)
]
df = spark.createDataFrame(data, ['A0', 'A1', 'A2', 'A3'])
cols = df.columns
df.withColumn("values", array(*cols)) \
.withColumn("any", expr("aggregate(values, 0D, (acc, x) -> acc + x)")) \
.withColumn("mean", col("any") / size(col("values"))) \
.withColumn("std", sqrt(expr("""aggregate(values, 0D,
(acc, x) -> acc + power(x - mean, 2),
acc -> acc / (size(values) -1))"""
)
)) \
.drop("values") \
.show(truncate=False)
#+---+---+---+---+----+----+------------------+
#|A0 |A1 |A2 |A3 |any |mean|std |
#+---+---+---+---+----+----+------------------+
#|9 |1 |2 |8 |20.0|5.0 |4.08248290463863 |
#|9 |7 |6 |9 |31.0|7.75|1.5 |
#|1 |7 |4 |6 |18.0|4.5 |2.6457513110645907|
#|0 |8 |4 |8 |20.0|5.0 |3.8297084310253524|
#|0 |1 |6 |0 |7.0 |1.75|2.8722813232690143|
#|7 |1 |4 |3 |15.0|3.75|2.5 |
#|6 |3 |5 |9 |23.0|5.75|2.5 |
#|3 |3 |2 |8 |16.0|4.0 |2.70801280154532 |
#|6 |3 |0 |8 |17.0|4.25|3.5 |
#|3 |2 |7 |1 |13.0|3.25|2.6299556396765835|
#+---+---+---+---+----+----+------------------+
火花:
您可以使用functools.reduce
和operator.add
对列求和。逻辑同上:
from functools import reduce
from operator import add
df.withColumn("any", reduce(add, [col(c) for c in cols])) \
.withColumn("mean", col("any") / len(cols)) \
.withColumn("std", sqrt(reduce(add, [(col(c) - col("mean")) ** 2 for c in cols]) / (len(cols) -1)))\
.show(truncate=False)
【讨论】:
spark 版本是 2.3.1,“any”列应该表示任何操作,而不是具体的值总和 aggregate()中的0D是什么意思 @Dee 是聚合函数的零值,D
指的是double类型。【参考方案2】:
上面的答案很好,但是我看到 OP 使用的是 Python/PySpark,如果你不理解 Spark SQL 表达式,上面的逻辑不是 100% 清楚的。
我建议使用 Pandas UDAF,与 UDF 不同,这些是矢量化的并且非常高效。这已添加到 Spark API 以降低从 pandas 迁移到 Spark 所需的学习曲线。这也意味着,如果您的大多数同事(例如我的同事)更熟悉 Pandas/Python,那么您的代码更易于维护。
这些是可用的 Pandas UDAF 类型及其对应的 Pandas
例如
SparkUdafType → df.pandasEquivalent(...) works on → returns
SCALAR → df.transform(...), Mapping Series → Series
GROUPED_MAP → df.apply(...) , Group & MapDataFrame → DataFrame
GROUPED_AGG → df.aggregate(...), Reduce Series → Scalar
【讨论】:
以上是关于如何按行将函数应用于 PySpark 数据帧的一组列?的主要内容,如果未能解决你的问题,请参考以下文章
pyspark:如何获取 spark 数据帧的 Spark SQLContext?