Pyspark:使用 map 函数而不是 collect 来迭代 RDD

Posted

技术标签:

【中文标题】Pyspark:使用 map 函数而不是 collect 来迭代 RDD【英文标题】:Pyspark: Using map function instead of collect for iterating RDDs 【发布时间】:2017-10-12 13:24:03 【问题描述】:

在 PySpark 中,我有两个结构为 (key,list of list) 的 RDD:

input_rdd.take(2) 
[(u'100',
  [[u'36003165800', u'70309879', u'1']]),
 (u'200',
  [[u'5196352600', u'194837393', u'99']]) ]   

output_rdd.take(2)
[(u'100', 
   [[u'875000', u'5959', u'1']]),
 (u'300', [[u'16107000', u'12428', u'1']])]

现在我想要一个结果 RDD(如下所示),它根据键对两个 RDD 进行分组,并按顺序(键,(,))将输出作为元组提供。以防任何输入中都不存在该键或输出,则该 rdd 的列表保持为空。

[(u'100',
 ([[[u'36003165800', u'70309879', u'1']]],
 [[[u'875000', u'5959', u'1']]]),
 (u'200',
 ([[[u'5196352600', u'194837393', u'99']]],
  [])),
 (u'300',([],[[[u'16107000', u'12428', u'1']]])
 ]

为了获得结果 RDD,我使用下面的代码使用

 resultant=sc.parallelize(x, tuple(map(list, y))) for x,y in sorted(list(input_rdd.groupWith(output_rdd).collect()))

有没有办法我可以删除 .collect() 并使用带有 groupWith 函数的 .map() 来在 Pyspark 中获得相同的结果 RDD?

【问题讨论】:

给出空的rdd 【参考方案1】:

完全外连接给出:

input_rdd.fullOuterJoin(output_rdd).collect()
# [(u'200', ([[u'5196352600', u'194837393', u'99']], None)), 
#  (u'300', (None, [[u'16107000', u'12428', u'1']])), 
#  (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))]

[]替换None

input_rdd.fullOuterJoin(output_rdd).map(lambda x: (x[0], tuple(i if i is not None else [] for i in x[1]))).collect()

# [(u'200', ([[u'5196352600', u'194837393', u'99']], [])), 
#  (u'300', ([], [[u'16107000', u'12428', u'1']])), 
#  (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))]

【讨论】:

以上是关于Pyspark:使用 map 函数而不是 collect 来迭代 RDD的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时

如何使用 map 函数正确并行运行 pyspark 代码

pyspark中的多个MAP函数

TypeError:元组索引必须是整数,而不是使用 pyspark 和 RDD 的 str

使用 map/filter 在 Pyspark 中的 RDD 中查找最大元素

Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]