带有额外参数 pyspark 的映射分区

Posted

技术标签:

【中文标题】带有额外参数 pyspark 的映射分区【英文标题】:map-partition with extra parameters pyspark 【发布时间】:2016-11-16 23:38:28 【问题描述】:

我想从 mappartition 向 python 函数传递一些额外的参数。任何建议..

我的示例代码如下所示

 def test(x,abc):
   <<code>>

 abc =1234
 df = df.repartition("key")
 res= df.rdd.mapPartitions(test, abc)

如果我将 abc 作为参数传递并在测试函数中使用它,我会遇到错误

例外:您似乎正在尝试广播 RDD 或从操作或转换中引用 RDD。 RDD 转换和动作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关详细信息,请参阅 SPARK-5063。

Mariusz 请找零

from pyspark.sql import Row
def test(abc):
    def my_map_partitions(x):
       print("----------start-----------")
       cnt=1
       ret = []
       for i in x:
         cnt=cnt+1
         val = Row(key1=i.key1, key2=i.key2, cnt=cnt)
         ret.append(val)
       return ret 
    return my_map_partitions
df = df.repartition("key1key2").sortWithinPartitions("key1key2")  
abc123 = df .rdd.mapPartitions(test(abc)) 

【问题讨论】:

【参考方案1】:

尝试创建返回函数的函数,例如:

def test(abc):
    def my_map_partitions(partition):
        ...do something with partition and abc...
    return my_map_partitions

df.rdd.mapPartitions(test(abc))

【讨论】:

我收到此错误。TypeError: 'NoneType' object is not callable 在什么功能。您的 my_map_partitions 是否返回可迭代变量(如列表?) Mariusz .. 我在原始帖子中添加了代码.. 对于每个分区,我会将行列表转换为列表列表并应用 julia lme 然后将行列表返回给调用者 您的代码中缺少return my_map_partitions。函数测试用于“生成” my_map_partitions 函数,因此需要返回它。 感谢 Mariusz .. 成功了。但是我看到很多空分区,我们如何才能摆脱它。你想让我检查列表的长度并忽略空吗?或任何其他解决方案?

以上是关于带有额外参数 pyspark 的映射分区的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD - 使用额外参数进行映射

发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询

在 pyspark RDD 上显示分区

使用 Jupyter Notebook 为 PySpark 内核设置 spark.app.name

带有 hive 的 pyspark - 无法正确创建分区并从数据框中保存表

在 PySpark 中涉及带有管道的子进程的映射步骤失败