为此任务编写自定义 AccumulatorParam 的正确方法是啥?
Posted
技术标签:
【中文标题】为此任务编写自定义 AccumulatorParam 的正确方法是啥?【英文标题】:What is the proper way to write a custom AccumulatorParam for this task?为此任务编写自定义 AccumulatorParam 的正确方法是什么? 【发布时间】:2020-03-12 12:54:56 【问题描述】:背景:使用 Azure Databricks、Python 编程语言、Spark 环境。
我有一个rdd
,并创建了一个map
操作。
rdd = sc.parallelize(my_collection)
mapper = rdd.map(lambda val: do_something(val))
假设这个映射器中的元素是Foo
类型。我在驱动程序节点上有一个Bar
类型的全局对象,并且有一个需要从工作节点填充的Foo
对象的内部集合(即mapper
中的元素)。
# This is what I want to do
bar_obj = Bar()
def add_to_bar(foo_obj):
global bar_obj
bar_obj.add_foo(foo_obj)
mapper.foreach(add_to_bar)
根据我对 RDD 编程指南的理解,由于闭包在 Spark 中的工作方式,这将不起作用。相反,我应该使用Accumulator
来完成此操作。
我知道我需要以某种方式继承 AccumulatorParam
,但我不确定这个类是什么样的,以及在这种情况下如何使用它。
这是我第一次通过:
class FooAccumulator(AccumulatorParam):
def zero(self, value):
return value.bar
def addInPlace(self, value1, value2):
# bar is the parent Bar object for the value1 Foo instance
value1.bar.add_foo(value2)
return value1
但我不确定如何从这里开始。
我还想补充一点,我试图简单地 .collect()
映射器的结果,但这会导致结果集大于驱动程序节点上的最大允许内存(~4G,当上升到10G 它可以运行,但最终会超时)。
【问题讨论】:
【参考方案1】:我不知道你到目前为止有没有尝试过?我自己发现了这段代码:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
所以也许你可以尝试做这样的事情:
from pyspark import AccumulatorParam
class FooAccumulator(AccumulatorParam):
def zero(self, f):
return []
def addInPlace(self, acc, el):
acc.extend(el)
return acc
accumulator = sc.accumulator([], FooAccumulator())
我认为这个thread 对你也有帮助。
【讨论】:
我已经对原始帖子添加了一个编辑,以显示我目前尝试过的内容,但没有任何运气。我相信您提供的FooAccumulator
是相似的,但与我的用例不太一致。再看看我的例子,如果你有任何想法,请告诉我。感谢您提供相关的线程链接。以上是关于为此任务编写自定义 AccumulatorParam 的正确方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章