Pyspark:使用 map 函数而不是 collect 来迭代 RDD
Posted
技术标签:
【中文标题】Pyspark:使用 map 函数而不是 collect 来迭代 RDD【英文标题】:Pyspark: Using map function instead of collect for iterating RDDs 【发布时间】:2017-10-12 13:24:03 【问题描述】:在 PySpark 中,我有两个结构为 (key,list of list) 的 RDD:
input_rdd.take(2)
[(u'100',
[[u'36003165800', u'70309879', u'1']]),
(u'200',
[[u'5196352600', u'194837393', u'99']]) ]
output_rdd.take(2)
[(u'100',
[[u'875000', u'5959', u'1']]),
(u'300', [[u'16107000', u'12428', u'1']])]
现在我想要一个结果 RDD(如下所示),它根据键对两个 RDD 进行分组,并按顺序(键,(,))将输出作为元组提供。以防任何输入中都不存在该键或输出,则该 rdd 的列表保持为空。
[(u'100',
([[[u'36003165800', u'70309879', u'1']]],
[[[u'875000', u'5959', u'1']]]),
(u'200',
([[[u'5196352600', u'194837393', u'99']]],
[])),
(u'300',([],[[[u'16107000', u'12428', u'1']]])
]
为了获得结果 RDD,我使用下面的代码使用
resultant=sc.parallelize(x, tuple(map(list, y))) for x,y in sorted(list(input_rdd.groupWith(output_rdd).collect()))
有没有办法我可以删除 .collect() 并使用带有 groupWith 函数的 .map() 来在 Pyspark 中获得相同的结果 RDD?
【问题讨论】:
给出空的rdd 【参考方案1】:完全外连接给出:
input_rdd.fullOuterJoin(output_rdd).collect()
# [(u'200', ([[u'5196352600', u'194837393', u'99']], None)),
# (u'300', (None, [[u'16107000', u'12428', u'1']])),
# (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))]
用[]
替换None
:
input_rdd.fullOuterJoin(output_rdd).map(lambda x: (x[0], tuple(i if i is not None else [] for i in x[1]))).collect()
# [(u'200', ([[u'5196352600', u'194837393', u'99']], [])),
# (u'300', ([], [[u'16107000', u'12428', u'1']])),
# (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))]
【讨论】:
以上是关于Pyspark:使用 map 函数而不是 collect 来迭代 RDD的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 和错误“TypeError:必须是实数,而不是 Column”,当尝试在窗口上使用定义的函数查找指南针方位时
TypeError:元组索引必须是整数,而不是使用 pyspark 和 RDD 的 str