如何在 PySpark 中获得不同的字典 RDD?

Posted

技术标签:

【中文标题】如何在 PySpark 中获得不同的字典 RDD?【英文标题】:How can I get a distinct RDD of dicts in PySpark? 【发布时间】:2016-02-19 16:19:44 【问题描述】:

我有一个字典的 RDD,我想获得一个仅包含不同元素的 RDD。但是,当我尝试打电话时

rdd.distinct()

PySpark 给我以下错误

TypeError: unhashable type: 'dict'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/02/19 16:55:56 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'dict'

我在 dict 中确实有一个可以用作不同元素的键,但文档没有提供任何有关如何解决此问题的线索。

编辑:内容由字符串、字符串数组和数字字典组成

编辑 2: 字典示例...我希望将具有相等“data_fingerprint”键的字典视为相等:

"id":"4eece341","data_fingerprint":"1707db7bddf011ad884d132bf80baf3c"

谢谢

【问题讨论】:

字典的内容到底是什么?你想如何散列这些? 回答了问题 这还不够。您需要一个准确的策略来比较字典。这些是不可散列的,原因有两个:可变性和未定义的顺序。在您的情况下,情况甚至更糟,因为它也包含不可散列的值。那么问题是什么让字典对你来说是平等的? 哦,好吧..我在字典内容中添加了一个“data_fingerprint”键,相等的字典有相等的键。有问题更新的示例 【参考方案1】:

正如@zero323 在他的评论中指出的那样,您必须决定如何比较字典,因为它们不可散列。一种方法是对键进行排序(因为它们没有任何特定的顺序),例如按字典顺序。然后创建一个字符串形式:

def dict_to_string(dict):
    ...
    return 'key1|value1|key2|value2...|keyn|valuen'

如果您有嵌套的不可散列对象,则必须递归地执行此操作。

现在您可以将您的 RDD 转换为与字符串配对作为键(或某种哈希)

pairs = dictRDD.map(lambda d: (dict_to_string(d), d))

为了得到你想要的,你只需要在休闲时通过按键减少

distinctDicts = pairs.reduceByKey(lambda val1, val2: val1).values()

【讨论】:

【参考方案2】:

由于您的数据提供了唯一键,您可以简单地执行以下操作:

(rdd
    .keyBy(lambda d: d.get("data_fingerprint"))
    .reduceByKey(lambda x, y: x)
    .values())

Python 字典至少存在两个问题,这使得它们不适合散列:

可变性 - 这使得任何散列变得棘手 任意键顺序

前段时间有一个 PEP 提议 frozerdicts (PEP 0416) 但最终被拒绝了。

【讨论】:

以上是关于如何在 PySpark 中获得不同的字典 RDD?的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中使用 RDD 从字典创建数据框

Pyspark - 使用广播字典中的日期过滤 RDD

pyspark:获取列表值的不同元素

在 Pyspark 中将字典转换为数据框

Pyspark - 将rdd转换为数据框时数据设置为null

PySpark - RDD 到 JSON