在 pyspark 中使用 foreach()

Posted

技术标签:

【中文标题】在 pyspark 中使用 foreach()【英文标题】:Using foreach() in pyspark 【发布时间】:2019-12-12 17:02:03 【问题描述】:

我有一个 pyspark DataFrame,其中包含一个名为 primary_use 的列。

这是第一行:

要创建一个布尔向量来指示某一行中的primary_useEducation 还是Office,我正在使用以下代码。但是,它返回 None 会导致异常:

def is_included_in(row):

    return(row['primary_use'] in ['Education', 'Office'])

building.foreach(is_included_in).show()

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-124-03dd626371bf> in <module>
----> 1 building.foreach(is_included_in).show()

AttributeError: 'NoneType' object has no attribute 'show'

您能解释一下结果并提出对代码的更正吗?

【问题讨论】:

【参考方案1】:

pyspark foreach 不会生成新的转换数据帧。 Foreach 允许遍历每条记录并执行一些非返回操作 - 例如写入磁盘,或调用一些外部 api

该函数实际上调用了df.rdd.foreach。 Rdd 是底层数据框 api。是比较低级的。转换每条记录的正确 rdd api 是 Rdd.map

dataframe api 还提供了运行标量映射用户定义函数的可能性。最新的是pandas udf

这样的isin 函数已经是标准 spark sql api 的一部分。

df = df.withColumn('is_included', df.primary_use.isin(['Education', 'Office']))

【讨论】:

以上是关于在 pyspark 中使用 foreach()的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:rdd.foreach(print)报错NameError

pyspark dataframe foreach 填充列表

为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

使用“--py-files”参数将 PySpark 作业提交到集群

使用 Pyspark 运行 Python 脚本时出现 py4j.protocol.Py4JJavaError [重复]

通过 --py-files 可以在pyspark中可以顺利导入