在 PySpark 中使用 rdd.map 解压和编码字符串

Posted

技术标签:

【中文标题】在 PySpark 中使用 rdd.map 解压和编码字符串【英文标题】:Unpickling and encoding a string using rdd.map in PySpark 【发布时间】:2018-09-14 13:27:08 【问题描述】:

我需要将代码从 PySpark 1.3 移植到 2.3(也仅限 Python 2.7),并且我在 rdd 上有以下映射转换:

import cPickle as pickle
import base64

path = "my_filename"

my_rdd = "rdd with data" # pyspark.rdd.PipelinedRDD()

# saving RDD to a file but first encoding everything
my_rdd.map(lambda line: base64.b64encode(pickle.dumps(line))).saveAsTextFile(path)

# another my_rdd.map doing the opposite of the above, fails with the same error
my_rdd = sc.textFile(path).map(lambda line: pickle.loads(base64.b64decode(line)))

运行此部分时,出现以下错误:

   raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

map 函数中似乎不再允许此类操作。任何建议如何潜在地重写这部分?

更新:

很奇怪,只是在做:

my_rdd.saveAsTextFile(path)

同样失败。

【问题讨论】:

请提供构造 my_rdd 的代码,包括您正在对其进行的所有转换。 @SergeyKhudyakov 所以错误是在转换过程中的某个地方?问题是,代码太大并且没有记录,有没有办法可以强制 PySpark 指向失败的实际行,而不是指向 rdd 上的最后一个操作? 是的,我没有发现您发布的代码有任何问题。最有可能的是,您的代码中有一个引用 RDD 的转换函数,这正是异常告诉您的。如果没有看到完整的代码,我认为没有人能够帮助你。实际行 - 您可以尝试插入调试语句,例如简单的str(rdd),在你正在做的转换之间。 【参考方案1】:

归根结底,问题出在进行转换的函数的深处。在这种情况下,重写比调试更容易。

【讨论】:

以上是关于在 PySpark 中使用 rdd.map 解压和编码字符串的主要内容,如果未能解决你的问题,请参考以下文章

Dataframe.rdd.map().collect 在 PySpark 中不起作用 [重复]

pyspark rdd map 没有调用函数

pyspark MLlib踩坑之model predict+rdd map zip

在pyspark中过滤两个RDD

使用 pyspark 交叉组合两个 RDD

如何使用 Pyspark 将一个 rdd 映射到另一个?