从 Pyspark 中的 RDD 中提取字典

Posted

技术标签:

【中文标题】从 Pyspark 中的 RDD 中提取字典【英文标题】:Extracting a dictionary from an RDD in Pyspark 【发布时间】:2015-06-23 15:02:02 【问题描述】:

这是一道作业题:

我有一个RDD,它是一个 os 元组的集合。我也有从每个输入元组返回字典的函数。不知何故,与 reduce 函数相反。

使用 map,我可以轻松地从 RDD 的元组转到 RDD 的字典。但是,由于字典是 (key, value) 对的集合,我想将字典的 RDD 转换为每个字典内容的 (key, value) 元组的 RDD

这样,如果我的RDD 包含 10 个元组,那么我会得到一个 RDD,其中包含 10 个具有 5 个元素的字典(例如),最后我会得到一个包含 50 个元组的 RDD

我认为这必须是可能的,但是,如何? (可能问题是我不知道这个操作英文怎么叫)

【问题讨论】:

【参考方案1】:

我的 2 美分:

有一个名为“collectAsMap”的 PairRDD 函数,它从 RDD 返回一个字典。

让我给你看一个例子:

sample = someRDD.sample(0, 0.0001, 0)
sample_dict = sample.collectAsMap()
print sample.collect()
print sample_dict

[('hi', 4123.0)]
'hi': 4123.0

文档here

希望对您有所帮助! 问候!

【讨论】:

非常方便。谢谢。请注意,这不适用于仅是列表的 RDD。它需要是如上所示的元组。 这将是一个更清晰的例子:dicts = sc.parallelize(["foo": 1, "bar": 2, "foo": 3, "baz": -1, "bar": 5]); print(result.collect()); print(result.collectAsMap()) 而这个 sn-p,和你的一样,显示了 .collect().collectAsMap() 之间的区别。【参考方案2】:

我猜你想要的只是一个flatMap

dicts = sc.parallelize(["foo": 1, "bar": 2, "foo": 3, "baz": -1, "bar": 5])
dicts.flatMap(lambda x: x.items())

flatMap 从 RDD 的元素中获取一个函数到可迭代的,然后连接结果。 Spark 上下文之外的同类型操作的另一个名称是mapcat

>>> from toolz.curried import map, mapcat, concat, pipe
>>> from itertools import repeat
>>> pipe(range(4), mapcat(lambda i: repeat(i, i + 1)), list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

或者一步一步来:

>>> pipe(range(4), map(lambda i: repeat(i, i + 1)), concat, list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

同样的事情使用itertools.chain

>>> from itertools import chain
>>> pipe((repeat(i, i + 1) for i in  range(4)), chain.from_iterable, list)
>>> [0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

【讨论】:

当然!你能解释一下flatMap和map有什么区别吗?我不认为 flatMap 适合这些情况...... flatMapapplies 一个函数,该函数将一个集合返回到此 RDD 的所有元素,然后将结果展平。看一下简化后的 Scala Rdd.flatMap 签名,如下所示:(f: (T) ⇒ TraversableOnce[U]): RDD[U] 我添加了一些非 Spark 示例。我希望这些会有所帮助。

以上是关于从 Pyspark 中的 RDD 中提取字典的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 使用广播字典中的日期过滤 RDD

如何在 PySpark 中获得不同的字典 RDD?

如何从 Pyspark 中的 RDD 中过滤

从 PySpark 中的 RDD 中的数据中查找最小和最大日期

Python pyspark 将 DF 写入 .csv 并存储在本地 C 盘

删除 RDD、Pyspark 中的停用词