RDD的Pyspark平均间隔

Posted

技术标签:

【中文标题】RDD的Pyspark平均间隔【英文标题】:Pyspark Average interval for RDD 【发布时间】:2019-11-09 21:49:37 【问题描述】:

我正在尝试使用 PySpark 查找相邻元组列表之间的平均差异。

例如,如果我有这样的 RDD

vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]

我想找出每个键的平均差异。

例如键值“2”

平均差异为 (abs(110-130) + abs(130-120))/2 = 15。

这是我目前的方法。我正在尝试更改平均计算代码以适应这种情况。但它似乎不起作用。

from pyspark import SparkContext
aTuple = (0,0)
interval = vals.aggregateByKey(aTuple, lambda a,b: (abs(a[0] - b),a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))
finalResult = interval.mapValues(lambda v: (v[0]/v[1])).collect()

我想使用 RDD 函数来执行此操作,而不是使用 Spark SQL 或任何其他附加包。

最好的方法是什么?

如果您有任何问题,请告诉我。

感谢您的宝贵时间。

【问题讨论】:

【参考方案1】:

我想出了一个天真的方法。我不确定这是否适用于所有情况。它是这样的。

让我们首先创建一个函数来计算移动平均线。如果这不是计算移动平均线的正确方法,请纠正我。

def get_abs(num_list):
    '''
    >>> get_abs([110, 130, 120])
    15.0
    '''
    acc = 0
    num_pairs = 0
    for i in range(len(num_list)-1):
        acc += abs(num_list[i]-num_list[i+1])
        num_pairs +=1
    return acc/num_pairs

接下来,我们将列表并行化

>>> vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]
>>> rdd = sc.parallelize(vals)
>>> rdd.collect()
[(2, 110),
 (2, 130),
 (2, 120),
 (3, 200),
 (3, 206),
 (3, 206),
 (4, 150),
 (4, 160),
 (4, 170)]

然后,将属于同一列表的值分组。

>>> vals = rdd.groupByKey().mapValues(list)
>>> vals.collect()
[(4, [150, 160, 170]), (2, [110, 130, 120]), (3, [200, 206, 206])]

然后我们只需要调用我们上面定义的函数来计算分组值的移动平均值。

>>> vals.mapValues(get_abs).collect()
[(4, 10.0), (2, 15.0), (3, 3.0)]

【讨论】:

这太棒了,我有时会忘记朴素的方法可能是最好的。感谢您的帮助,非常感谢!

以上是关于RDD的Pyspark平均间隔的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:将稀疏局部矩阵转换为 RDD

RDD编程初级实践

RDD编程初级实践

RDD编程初级实践

删除 RDD、Pyspark 中的停用词

PySpark,按键交叉