在“pyspark.resultiterable.ResultIterable”上调用 Distinct
Posted
技术标签:
【中文标题】在“pyspark.resultiterable.ResultIterable”上调用 Distinct【英文标题】:Call Distinct on 'pyspark.resultiterable.ResultIterable' 【发布时间】:2015-06-21 18:24:43 【问题描述】:我正在编写一些 spark 代码,并且我有一个看起来像
的 RDD[(4, <pyspark.resultiterable.ResultIterable at 0x9d32a4c>),
(1, <pyspark.resultiterable.ResultIterable at 0x9d32cac>),
(5, <pyspark.resultiterable.ResultIterable at 0x9d32bac>),
(2, <pyspark.resultiterable.ResultIterable at 0x9d32acc>)]
我需要做的是在 pyspark.resultiterable.ResultIterable
上调用一个 distinct
我试过了
def distinctHost(a, b):
p = sc.parallelize(b)
return (a, p.distinct())
mydata.map(lambda x: distinctHost(*x))
但我得到一个错误:
例外:您似乎正在尝试引用 来自广播变量、动作或转换的 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它运行的代码中使用 对工人。有关详细信息,请参阅 SPARK-5063。
错误是不言自明的,我不能使用 sc。但我需要找到一种方法将 pyspark.resultiterable
.ResultIterable
覆盖到 RDD,以便我可以在其上调用 distinct。
【问题讨论】:
【参考方案1】:直截了当的方法是使用集合:
from numpy.random import choice, seed
seed(323)
keys = (4, 1, 5, 2)
hosts = [
u'in24.inetnebr.com',
u'ix-esc-ca2-07.ix.netcom.com',
u'uplherc.upl.com',
u'slppp6.intermind.net',
u'piweba4y.prodigy.com'
]
pairs = sc.parallelize(zip(choice(keys, 20), choice(hosts, 20))).groupByKey()
pairs.map(lambda (k, v): (k, set(v))).take(3)
结果:
[(1, u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net'),
(2,
u'in24.inetnebr.com',
u'ix-esc-ca2-07.ix.netcom.com',
u'slppp6.intermind.net',
u'uplherc.upl.com'),
(4, u'in24.inetnebr.com', u'piweba4y.prodigy.com', u'uplherc.upl.com')]
如果有使用rdd.disinct
的特殊原因,您可以尝试以下方法:
def distinctHost(pairs, key):
return (pairs
.filter(lambda (k, v): k == key)
.flatMap(lambda (k, v): v)
.distinct())
[(key, distinctHost(pairs, key).collect()) for key in pairs.keys().collect()]
【讨论】:
以上是关于在“pyspark.resultiterable.ResultIterable”上调用 Distinct的主要内容,如果未能解决你的问题,请参考以下文章
NOIP 2015 & SDOI 2016 Round1 & CTSC 2016 & SDOI2016 Round2游记