Pyspark:基于类型的值操作
Posted
技术标签:
【中文标题】Pyspark:基于类型的值操作【英文标题】:Pyspark: operation on values based on type 【发布时间】:2016-03-03 22:44:07 【问题描述】:我有这样的RDD:
[('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))]
这样键可以具有单个tuple
或tuples
的tuple
作为值。这来自reduceByKey
。
我需要执行一个简单的操作:将 S 的计数除以 (H + S) 的计数。
当 S 不存在时,例如第一项,我将不得不返回 0。
问题是将第一种情况(单个tuple
)与第二种情况(tuple
of两个tuples
)隔离开来,以便我知道如何在map
中操作。
我将如何进行?
【问题讨论】:
你首先如何获得这样的数据?它是某种具有特定含义的层次结构吗?如果不是,我会更有意义地在上游强制执行一致的整形器。 Python 从 3.4 开始提供基本的调度机制,但这些机制很浅。 【参考方案1】:一般来说,在上游修复此问题会更有意义,但您可以尝试以下方法:
from operator import truediv
def f(vs):
try:
d = dict(vs)
except ValueError:
d = dict([vs])
s = sum(d.values())
return truediv(d.get("S", 0), s) if s else float('nan')
rdd = sc.parallelize([('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))])
rdd.mapValues(f).collect()
## [('a', 0.0), ('b', 0.023809523809523808)]
或者,如果你不介意外部依赖,可以尝试使用multipledispatch
:
from multipledispatch import dispatch
@dispatch(tuple, tuple)
def f(h, s):
try:
return truediv(s[1], h[1] + s[1])
except ZeroDivisionError:
return float('nan')
@dispatch(str, int)
def f(x, y):
return 0.0
rdd.mapValues(lambda args: f(*args)).collect()
## [('a', 0.0), ('b', 0.023809523809523808)]
【讨论】:
我喜欢调度方法,它完全可以隐藏isinstance
的丑陋之处。但它需要相对复杂的代码。如果走一条快乐的道路(不要输入except
阻止第一个应该更快)。异常处理非常昂贵,所以我预计摊销成本会更高。一般来说,我真的会在上游修复这个问题并提供可以直接使用的输入。以上是关于Pyspark:基于类型的值操作的主要内容,如果未能解决你的问题,请参考以下文章