连接具有多个值组件的两个 RDD 并将结果展平

Posted

技术标签:

【中文标题】连接具有多个值组件的两个 RDD 并将结果展平【英文标题】:Joining two RDDs with multiple value components and flattening the result 【发布时间】:2018-12-05 16:11:44 【问题描述】:

我有 2 个具有相同键的 RDD,但值类型不同(超过 2 个值)。我想按键加入这些 RDD,然后将它们的值附加到最后的元组中(见下文)。最好的方法是什么?

rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])

期望的输出RDD

[ (1, "Foo", "test1", [5,6,7]), (2, "Bar", "test2", [1,2,3]) ]

直接加入不起作用:

print(rdd2.join(rdd1).collect())
#[(1, ('Foo', 'test1')), (2, ('Bar', 'test2'))]

这会忽略rdd1 中的其余值,并且输出格式错误。

【问题讨论】:

为什么要提到左外连接? 请更新您的问题,因为左外连接不会检索您选择的最终 RDD。 【参考方案1】:

可以在此处使用join,前提是您首先将rdds 映射为(key, value) 的形式。

rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])

def map_to_kvp(row):
    if len(row) < 3:
        return row
    return (row[0], tuple(row[1:]))

rdd3 = rdd2.map(map_to_kvp).join(rdd1.map(map_to_kvp))
print(rdd3.collect())
#[
#    (1, ('Foo', ('test1', [5, 6, 7]))), 
#    (2, ('Bar', ('test2', [1, 2, 3])))
#]

现在您已将所有数据放在正确的位置,但您只需将结果行展平即可。

在这种情况下,您必须编写自己的 flatten 函数以避免同时将 stringlist 变平。

我们可以在this answer 到How can I flatten lists without splitting strings? 的基础上创建自己的功能:

def flatten(foo):
    for x in foo:
        if hasattr(x, '__iter__') and not isinstance(x, str) and not isinstance(x, list):
            for y in flatten(x):
                yield y
        else:
            yield x

rdd4 = rdd3.map(lambda row: tuple(flatten(row)))
print(rdd4.collect())
#[(1, 'Foo', 'test1', [5, 6, 7]), (2, 'Bar', 'test2', [1, 2, 3])]

【讨论】:

谢谢,我认为 join 可以在具有多个值的元组上完成,但必须将所有值分组到一个元组中......

以上是关于连接具有多个值组件的两个 RDD 并将结果展平的主要内容,如果未能解决你的问题,请参考以下文章

雪花中具有多个 JSON 对象的横向展平数组

如何使用 MySQL 展平左连接的结果?

透视具有多个值的表

如何使用具有多个结果的子查询将值插入表中?

使用 vba 连接多个范围

Mysql - 将值插入具有未知主键的多个表中