将用户定义的函数应用于 PySpark 数据帧并返回字典

Posted

技术标签:

【中文标题】将用户定义的函数应用于 PySpark 数据帧并返回字典【英文标题】:Applying a user defined function to a PySpark dataframe and return a dictionary 【发布时间】:2017-08-25 10:49:01 【问题描述】:

假设我有一个名为 df 的 pandas 数据框

id value1 value2
1 2 1
2 2 1
3 4 5

在纯 Python 中,我编写了一个函数来处理这个数据帧并返回一个字典:

d = dict()
for row in df.itertuples()
   x = do_something (row)
   d[x[0]] = x[1:]

我正在尝试使用 Spark 重新实现此功能。

d = dict() # define a global var
def do_something (id, value1, value2):
   # business logic
   d[x0] = [x1,x2,x3]
   return 0
udf_do = udf (do_something)

然后:

df_spark.select (udf_do ('id','value1','value2'))

我的想法是,通过调用df_spark.select,函数do_something 将在数据帧上被调用,它会更新全局变量d。我并不关心udf_do 的返回值,所以我返回0。

确实,我的解决方案不起作用。

您能否建议我一些迭代方式(我知道这不是 Spark 方式)或以某种方式处理 Spark 数据帧并更新外部字典?

请注意,数据框非常大。我尝试通过调用 toPandas() 将其转换为 pandas,但我遇到了 OOM 问题。

【问题讨论】:

看来你正在寻找类似answer的东西 【参考方案1】:

UDF 无法更新任何全局状态。但是,您可以在 UDF 中进行一些业务登录,然后使用toLocalIterator 以节省内存的方式(逐个分区)将所有数据获取到驱动程序。例如:

df = spark.createDataFrame([(10, 'b'), (20, 'b'), (30, 'c'), 
                            (40, 'c'), (50, 'c'), (60, 'a')], ['col1', 'col2'])
df.withColumn('udf_result', ......)
df.cache()
df.count() # force cache fill

for row in df.toLocalIterator():
    print(row)

【讨论】:

以上是关于将用户定义的函数应用于 PySpark 数据帧并返回字典的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中应用用户定义的聚合函数的替代方法

迭代地子集数据帧并使用 R 应用于绘图函数

PySpark - 遍历每一行数据帧并运行配置单元查询

将 PySpark 命令转换为自定义函数

Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列

python 将yaml文件中定义的过滤器应用于PySpark数据帧