Pyspark:基于类型的值操作

Posted

技术标签:

【中文标题】Pyspark:基于类型的值操作【英文标题】:Pyspark: operation on values based on type 【发布时间】:2016-03-03 22:44:07 【问题描述】:

我有这样的RDD:

[('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))]

这样键可以具有单个tupletuplestuple 作为值。这来自reduceByKey。 我需要执行一个简单的操作:将 S 的计数除以 (H + S) 的计数。 当 S 不存在时,例如第一项,我将不得不返回 0。 问题是将第一种情况(单个tuple)与第二种情况(tuple of两个tuples)隔离开来,以便我知道如何在map中操作。

我将如何进行?

【问题讨论】:

你首先如何获得这样的数据?它是某种具有特定含义的层次结构吗?如果不是,我会更有意义地在上游强制执行一致的整形器。 Python 从 3.4 开始提供基本的调度机制,但这些机制很浅。 【参考方案1】:

一般来说,在上游修复此问题会更有意义,但您可以尝试以下方法:

from operator import truediv

def f(vs):
    try:
        d = dict(vs)
    except ValueError:
        d = dict([vs])

    s = sum(d.values())
    return truediv(d.get("S", 0), s) if s else float('nan')

rdd = sc.parallelize([('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))])
rdd.mapValues(f).collect()

## [('a', 0.0), ('b', 0.023809523809523808)]

或者,如果你不介意外部依赖,可以尝试使用multipledispatch

from multipledispatch import dispatch

@dispatch(tuple, tuple)
def f(h, s):
    try:
        return truediv(s[1], h[1] + s[1])
    except ZeroDivisionError:
        return float('nan')

@dispatch(str, int)
def f(x, y):
    return 0.0

rdd.mapValues(lambda args: f(*args)).collect()
## [('a', 0.0), ('b', 0.023809523809523808)]

【讨论】:

我喜欢调度方法,它完全可以隐藏isinstance 的丑陋之处。但它需要相对复杂的代码。如果走一条快乐的道路(不要输入except 阻止第一个应该更快)。异常处理非常昂贵,所以我预计摊销成本会更高。一般来说,我真的会在上游修复这个问题并提供可以直接使用的输入。

以上是关于Pyspark:基于类型的值操作的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 过滤数组中基于 RDD 的值

Pyspark:检查数组类型列是不是包含列表中的值[重复]

基于另一列中的值的一列上的pyspark滞后函数

在pyspark中添加数据类型为字符串格式的两列的值

如果 pyspark 数据帧的行基于两列的值位于另一个数据帧中,如何删除它们?

pyspark:仅基于 rdd 的操作