如何在pyspark中使用reduceByKey作为多键和单值[重复]

Posted

技术标签:

【中文标题】如何在pyspark中使用reduceByKey作为多键和单值[重复]【英文标题】:How can I use reduceByKey in pyspark for a multikey and single value [duplicate] 【发布时间】:2017-12-30 10:25:58 【问题描述】:

我在 Ubuntu 上使用 jupyter。

所以我遇到了下一个问题,这是我的代码:

from pyspark import SparkContext
 sc = SparkContext.getOrCreate()
 ut = sc.textFile("hdfs://localhost:54310/hduser/firstnames")
 rows= ut.map(lambda line: line.split(";"))
 res = rows.filter(lamda row: row[2] >= "2000" and row[2] <= "2004")
 res = res.map(lambda row: (row[1],row[2],int(row[3])))

输出:

[('2001', 'Brussel', 9),
 ('2001', 'Brussel', 104),
 ('2001', 'Vlaanderen', 16),
 ('2002', 'Brussel', 12), ...]

我需要我的输出是这样的:

[('2001', 'Brussel', 113),
 ('2001', 'Vlaanderen', 16),
 ('2002', 'Brussel', 12)]

我之前用 reduceByKey 尝试过一些事情 已经看到很多关于reduceByKey的问题,但无法弄清楚。提前致谢。

【问题讨论】:

【参考方案1】:

正如A list as a key for PySpark's reduceByKey zero323 中所解释的,密钥必须实现哈希方法。你可以使用tuples:

>>> from operator import add
... 
... sc.parallelize([
...     (('2001', 'Brussel'), 9), (('2001', 'Brussel'), 104),
...     (('2001', 'Vlaanderen'), 16), (('2002', 'Brussel'), 12)
... ]).reduceByKey(add).take(2)
... 
[(('2002', 'Brussel'), 12), (('2001', 'Brussel'), 113)]

替换:

res.map(lambda row: (row[1],row[2],int(row[3])))

res.map(lambda row: ((row[1], row[2]), int(row[3])))

或将set 替换为frozenset

>>> sc.parallelize([
...     (frozenset(['2001', 'Brussel']), 9), (frozenset(['2001', 'Brussel']), 104),
...     (frozenset(['2001', 'Vlaanderen']), 16), (frozenset(['2002', 'Brussel']), 12)
... ]).reduceByKey(add).take(2)

[(frozenset('2002', 'Brussel'), 12), (frozenset('2001', 'Brussel'), 113)]

【讨论】:

谢谢!现在工作正常!

以上是关于如何在pyspark中使用reduceByKey作为多键和单值[重复]的主要内容,如果未能解决你的问题,请参考以下文章

IndexError:在pyspark shell上使用reduceByKey操作时列出索引超出范围

Pyspark - 在作为列表的 spark 数据框列上使用 reducebykey

Spark(pyspark)如何仅在3元素元组的2个元素上reduceByKey

reduceByKey PySpark 中的列表列表

PySpark reduceByKey 对多个值

PySpark reduceByKey 只有一个键