如何使用 PySpark 对两个 RDD 进行完全外连接?
Posted
技术标签:
【中文标题】如何使用 PySpark 对两个 RDD 进行完全外连接?【英文标题】:How to do a full Outer Join of two RDDs with PySpark? 【发布时间】:2016-10-12 14:49:13 【问题描述】:我正在寻找一种按键组合两个 RDD 的方法。
给定:
x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
]
)
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
]
)
所以我有 3 种类型的信息:ID、国家代码和邮政编码。 我想要我的 RDD 的完全外部连接。 这是我的代码:
sorted(x.fullOuterJoin(y, numPartitions = None).collect())
这就是结果:
[('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', None)),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', 'KlGZj08d')),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, 'KNPQLQth')),
('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, 'JmJCFu3N'))]
奇怪的是加入后邮政编码消失了! 可能有什么问题?
理想情况下,我的结果应该是这样的:
[('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth')),
('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N'))]
我尝试做其他事情:
x.union(y).collect()
给出:
[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d')]
我现在想做一个 groupByKey 或一个 reduceByKey。
这是给出错误信息的代码:
sorted(x.union(y).groupByKey().mapValues(list).collect())
但是,x.union(y).groupByKey() 部分似乎有效..
有没有办法打印结果? (收集()不起作用) 任何帮助表示赞赏。谢谢!
【问题讨论】:
【参考方案1】:cogroup 在某些情况下很有用:
cogrouped = x.cogroup(y)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
【讨论】:
Morito : 结果:[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ([], ['JmJCFu3N'])), ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi76, [ERR'] 'KlGZj08d'])), ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', (['TN'], [])), ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ([], ['pan>KNPQLQ)])] 【参考方案2】:我找到了解决办法!尽管如此,这个解决方案对于我想做的事情并不完全令人满意。
所以:
x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
]
)
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
]
)
我创建了一个函数来指定我的密钥,该密钥将指向我的名为“x”的 rdd:
def get_keys(rdd):
new_x = rdd.map(lambda item: (item[0], (item[1], item[2])))
return new_x
new_x = get_keys(x)
给出:
[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))]
然后:
new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect()
结果:
[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])]
我想要的是:
[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))]
【讨论】:
以上是关于如何使用 PySpark 对两个 RDD 进行完全外连接?的主要内容,如果未能解决你的问题,请参考以下文章