Pandas 到 pyspark cumprod 函数
Posted
技术标签:
【中文标题】Pandas 到 pyspark cumprod 函数【英文标题】:Pandas to pyspark cumprod function 【发布时间】:2019-12-06 11:59:54 【问题描述】:我正在尝试将以下 pandas 代码转换为 pyspark
Python Pandas 代码:
df = spark.createDataFrame([(1, 1,0.9), (1, 2,0.13), (1, 3,0.5), (1, 4,1.0), (1, 5,0.6)], ['col1', 'col2','col3'])
pandas_df = df.toPandas()
pandas_df['col4'] = (pandas_df.groupby(['col1','col2'])['col3'].apply(lambda x: (1 - x).cumprod()))
pandas_df
结果如下:
col1 col2 col3 col4
0 1 1 0.90 0.10
1 1 2 0.13 0.87
2 1 3 0.50 0.50
3 1 4 1.00 0.00
4 1 5 0.60 0.40
和转换后的火花代码:
from pyspark.sql import functions as F, Window, types
from functools import reduce
from operator import mul
df = spark.createDataFrame([(1, 1,0.9), (1, 2,0.13), (1, 3,0.5), (1, 4,1.0), (1, 5,0.6)], ['col1', 'col2','col3'])
partition_column = ['col1','col2']
window = Window.partitionBy(partition_column)
expr = 1.0 - F.col('col3')
mul_udf = F.udf(lambda x: reduce(mul, x), types.DoubleType())
df = df.withColumn('col4', mul_udf(F.collect_list(expr).over(window)))
df.orderBy('col2').show()
及其输出
+----+----+----+-------------------+
|col1|col2|col3| col4|
+----+----+----+-------------------+
| 1| 1| 0.9|0.09999999999999998|
| 1| 2|0.13| 0.87|
| 1| 3| 0.5| 0.5|
| 1| 4| 1.0| 0.0|
| 1| 5| 0.6| 0.4|
+----+----+----+-------------------+
我不完全了解 pandas 的工作原理,有人可以帮我验证上述转换是否正确,而且我正在使用 UDF,这会降低性能,pyspark 中是否有可用的分布式函数 @987654326 @?
提前致谢
【问题讨论】:
对于spark 2.4+,可以使用aggregate
【参考方案1】:
由于正数的乘积可以用log
和exp
函数(a*b*c = exp(log(a) + log(b) + log(c))
)表示,因此您可以仅使用 Spark 内置函数计算累积乘积:
df.groupBy("col1", "col2") \
.agg(max(col("col3")).alias("col3"),
coalesce(exp(sum(log(lit(1) - col("col3")))), lit(0)).alias("col4")
)\
.orderBy(col("col2"))\
.show()
【讨论】:
谢谢,这完全符合我的要求。我的数据比较庞大,每组大约有 100 万条记录。我将运行代码,看看它是如何执行的。以上是关于Pandas 到 pyspark cumprod 函数的主要内容,如果未能解决你的问题,请参考以下文章
cumsum累计函数系列:pd.cumsum()pd.cumprod()pd.cummax()pd.cummin()
Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]