如何使用 PySpark 对两个 RDD 进行完全外连接?

Posted

技术标签:

【中文标题】如何使用 PySpark 对两个 RDD 进行完全外连接?【英文标题】:How to do a full Outer Join of two RDDs with PySpark? 【发布时间】:2016-10-12 14:49:13 【问题描述】:

我正在寻找一种按键组合两个 RDD 的方法。

给定:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
                    ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
                   ]
                  )

y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
                    ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
                    ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
                   ]
                  )

所以我有 3 种类型的信息:ID、国家代码和邮政编码。 我想要我的 RDD 的完全外部连接。 这是我的代码:

sorted(x.fullOuterJoin(y, numPartitions = None).collect())

这就是结果:

[('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', None)),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', 'KlGZj08d')),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, 'KNPQLQth')),
 ('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, 'JmJCFu3N'))]

奇怪的是加入后邮政编码消失了! 可能有什么问题?

理想情况下,我的结果应该是这样的:

[('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth')),
 ('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N'))]  

我尝试做其他事情:

x.union(y).collect()

给出:

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
 ('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d')]

我现在想做一个 groupByKey 或一个 reduceByKey。

这是给出错误信息的代码:

sorted(x.union(y).groupByKey().mapValues(list).collect())

但是,x.union(y).groupByKey() 部分似乎有效..

有没有办法打印结果? (收集()不起作用) 任何帮助表示赞赏。谢谢!

【问题讨论】:

【参考方案1】:

cogroup 在某些情况下很有用:

 cogrouped = x.cogroup(y)

 cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()

【讨论】:

Morito : 结果:[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ([], ['JmJCFu3N'])), ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi76, [ERR'] 'KlGZj08d'])), ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', (['TN'], [])), ('_guid_hG88Yt5EUsqT8a06Cy380ga​​3XHPwaFylNyuvvqDslCw=', ([], ['pan>KNPQLQ)])] 【参考方案2】:

我找到了解决办法!尽管如此,这个解决方案对于我想做的事情并不完全令人满意。

所以:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
                ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
               ]
              )
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
                ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
                ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
               ]
              )

我创建了一个函数来指定我的密钥,该密钥将指向我的名为“x”的 rdd:

def get_keys(rdd):

    new_x = rdd.map(lambda item: (item[0], (item[1], item[2])))
    return new_x

new_x = get_keys(x)

给出:

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))]

然后:

new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect()

结果:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])]

我想要的是:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))]  

【讨论】:

以上是关于如何使用 PySpark 对两个 RDD 进行完全外连接?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Pyspark 组合两个 Dstream(类似于普通 RDD 上的 .zip)

PySpark 使用函数创建多索引配对 RDD

使用 pyspark 交叉组合两个 RDD

如何在 PySpark 中将两个 rdd 合并为一个

如何在 PySpark 中压缩两个 RDD?

在 pyspark 中合并两个 RDD