Spark - 嵌套 RDD 操作

Posted

技术标签:

【中文标题】Spark - 嵌套 RDD 操作【英文标题】:Spark - Nested RDD Operation 【发布时间】:2015-10-21 10:54:46 【问题描述】:

我有两个 RDD 说

   rdd1 = 
id            | created     | destroyed | price   
1            | 1            | 2            | 10        
2            | 1            | 5            | 11       
3            | 2            | 3            | 11        
4            | 3            | 4            | 12        
5            | 3            | 5            | 11       

rdd2 =

[1,2,3,4,5] # lets call these value as timestamps (ts)

rdd2 基本上是使用 range(initial_value, end_value, interval) 生成的。这里的参数可以变化。大小可以与 rdd1 相同或不同。这个想法是使用过滤条件根据 rdd2 的值将记录从 rdd1 提取到 rdd2(从 rdd1 的记录可以在提取时重复,如您在输出中看到的那样)

过滤条件 rdd1.created

预期输出:

ts             | prices  
1              | 10,11       # i.e. for ids 1,2 of rdd1      
2              | 11,11       # ids 2,3
3              | 11,12,11    # ids 2,4,5 
4              | 11,11       # ids 2,5

现在我想根据一些使用RDD2的键的条件过滤RDD1。 (如上所述)并返回连接RDD2的键和RDD1的过滤结果的结果

所以我这样做:

rdd2.map(lambda x : somefilterfunction(x, rdd1))  

def somefilterfunction(x, rdd1):
    filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
    prices = filtered_rdd1.map(lambda x : x[3])
    res = prices.collect()
    return (x, list(res))

我得到:

例外:您似乎正在尝试广播 RDD 或 从动作或转换中引用 RDD。 RDD 转换 并且动作只能由驱动程序调用,不能在其他内部调用 转变;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作 不能在 rdd1.map 转换内执行。更多 信息,请参阅 SPARK-5063。

我尝试使用 groupBy ,但由于这里 rdd1 的元素可以一次又一次地重复,而我知道分组只会将 rdd1 的每个元素组合在某个特定的插槽中一次。

现在唯一的方法是使用普通的 for 循环并进行过滤并最终加入所有内容。

有什么建议吗?

【问题讨论】:

【参考方案1】:

由于您使用常规范围,因此根本没有理由创建第二个 RDD。您可以简单地为每条记录生成特定范围内的值:

from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile

rdd1 = sc.parallelize([
    (1, 1, 2, 10),        
    (2, 1, 5, 11),       
    (3, 2, 3, 11),        
    (4, 3, 4, 12),        
    (5, 3, 5, 11),  
])


def generate(start, end, step):
    def _generate(id, created, destroyed, price):
        # Smallest ts >= created
        start_for_record = int(ceil((created - start) / step) * step + start)
        rng = takewhile(
            lambda x: created <= x < destroyed,
            xrange(start_for_record, end, step)) # In Python 3.x use range
        for i in rng:
            yield i, price

    return _generate

result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()

结果:

result.mapValues(list).collect()

## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]

【讨论】:

这个解决方案教会了我几件事,其中之一是 flatMap 的一个很好的用例、takewhile 函数的使用以及如何以这种方式耦合生成器。只是为了知识 - 如果 rdd2 不是可以轻松生成的常规范围,您会选择哪种方法。 这是一个宽泛的问题,但一般来说范围分区、排序和状态扫描应该可以解决问题。

以上是关于Spark - 嵌套 RDD 操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark笔记:RDD基本操作(上)

spark的Pair RDD的转化操作

Spark算子

Spark RDD基本概念宽窄依赖转换行为操作

Spark的RDD简单操作

Spark RDD Action操作