为此任务编写自定义 AccumulatorParam 的正确方法是啥?

Posted

技术标签:

【中文标题】为此任务编写自定义 AccumulatorParam 的正确方法是啥?【英文标题】:What is the proper way to write a custom AccumulatorParam for this task?为此任务编写自定义 AccumulatorParam 的正确方法是什么? 【发布时间】:2020-03-12 12:54:56 【问题描述】:

背景:使用 Azure Databricks、Python 编程语言、Spark 环境。

我有一个rdd,并创建了一个map 操作。

rdd = sc.parallelize(my_collection)
mapper = rdd.map(lambda val: do_something(val))

假设这个映射器中的元素是Foo 类型。我在驱动程序节点上有一个Bar 类型的全局对象,并且有一个需要从工作节点填充的Foo 对象的内部集合(即mapper 中的元素)。

# This is what I want to do
bar_obj = Bar()

def add_to_bar(foo_obj):
    global bar_obj
    bar_obj.add_foo(foo_obj)

mapper.foreach(add_to_bar)

根据我对 RDD 编程指南的理解,由于闭包在 Spark 中的工作方式,这将不起作用。相反,我应该使用Accumulator 来完成此操作。

我知道我需要以某种方式继承 AccumulatorParam,但我不确定这个类是什么样的,以及在这种情况下如何使用它。

这是我第一次通过:

class FooAccumulator(AccumulatorParam):
  def zero(self, value):
    return value.bar
  def addInPlace(self, value1, value2):
    # bar is the parent Bar object for the value1 Foo instance
    value1.bar.add_foo(value2)
    return value1

但我不确定如何从这里开始。

我还想补充一点,我试图简单地 .collect() 映射器的结果,但这会导致结果集大于驱动程序节点上的最大允许内存(~4G,当上升到10G 它可以运行,但最终会超时)。

【问题讨论】:

【参考方案1】:

我不知道你到目前为止有没有尝试过?我自己发现了这段代码:

    from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

所以也许你可以尝试做这样的事情:

from pyspark import AccumulatorParam

class FooAccumulator(AccumulatorParam):
    def zero(self, f):
        return []
    def addInPlace(self, acc, el):
        acc.extend(el)
        return acc

accumulator = sc.accumulator([], FooAccumulator())

我认为这个thread 对你也有帮助。

【讨论】:

我已经对原始帖子添加了一个编辑,以显示我目前尝试过的内容,但没有任何运气。我相信您提供的FooAccumulator 是相似的,但与我的用例不太一致。再看看我的例子,如果你有任何想法,请告诉我。感谢您提供相关的线程链接。

以上是关于为此任务编写自定义 AccumulatorParam 的正确方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

hive自定义函数开发与部署

. 编写自定义任务类

. 编写自定义任务类

自定义异步任务的实现不起作用

如何编写自定义的 gradle 任务以不忽略 Findbugs 违规但在分析完成后失败

我们如何在 Hive 中为自定义 Writable 类型编写自定义 ObjectInspector?