Spark RDD - 使用额外参数进行映射

Posted

技术标签:

【中文标题】Spark RDD - 使用额外参数进行映射【英文标题】:Spark RDD - Mapping with extra arguments 【发布时间】:2016-01-06 06:57:30 【问题描述】:

是否可以将额外的参数传递给 pySpark 中的映射函数? 具体来说,我有以下代码配方:

raw_data_rdd = sc.textFile("data.json", use_unicode=True)
json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line))
mapped_rdd = json_data_rdd.flatMap(processDataLine)

processDataLine 函数除了 JSON 对象外还接受额外的参数,如:

def processDataLine(dataline, arg1, arg2)

如何将额外的参数 arg1arg2 传递给 flaMap 函数?

【问题讨论】:

考虑阅读this 感谢@AvihooMamka。据我了解,我需要使用部分功能。但我不知道如何将它应用到我的案例中? 广播后为什么不将processDataLine函数和你想要的参数发送给partial函数呢? 【参考方案1】:

    您可以直接在 flatMap 中使用匿名函数

    json_data_rdd.flatMap(lambda j: processDataLine(j, arg1, arg2))
    

    或咖喱processDataLine

    f = lambda j: processDataLine(dataline, arg1, arg2)
    json_data_rdd.flatMap(f)
    

    你可以像这样生成processDataLine

    def processDataLine(arg1, arg2):
        def _processDataLine(dataline):
            return ... # Do something with dataline, arg1, arg2
        return _processDataLine
    
    json_data_rdd.flatMap(processDataLine(arg1, arg2))
    

    toolz 库提供了有用的curry 装饰器:

    from toolz.functoolz import curry
    
    @curry
    def processDataLine(arg1, arg2, dataline): 
        return ... # Do something with dataline, arg1, arg2
    
    json_data_rdd.flatMap(processDataLine(arg1, arg2))
    

    请注意,我已将 dataline 参数推到最后一个位置。这不是必需的,但这样我们就不必使用关键字 args。

    最后在 cmets 中 Avihoo Mamka 已经提到了 functools.partial

【讨论】:

@guilhermecgs 您可以在本地集合上对此进行基准测试,但显式嵌套(2.)应该是最有效的,其次是使用匿名函数(1.)Currying / partials 可能会稍微慢一些,因为机制要多得多比前两个复杂。并不是说我真的会在这里担心。

以上是关于Spark RDD - 使用额外参数进行映射的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD与MapReduce

spark数据分区数量的原理

PySpark 将 Dataframe 作为额外参数传递给映射

spark的RDDAPI总结

Spark - 如何使用有状态映射器对已排序的 RDD 进行平面映射?

Spark性能测试报告与调优参数