Spark in Python Working with Tuples - 如何在加入两个 RDD 后合并两个元组

Posted

技术标签:

【中文标题】Spark in Python Working with Tuples - 如何在加入两个 RDD 后合并两个元组【英文标题】:Spark in Python Working with Tuples - How can I merge two tuples after joining two RDDs 【发布时间】:2017-01-23 01:04:42 【问题描述】:

我是 Spark 环境和开发的新手。

我有两个 RDD,通过连接器合并,连接器的结果如下:

(u'10611', ((u'Laura', u'Mcgee'), (u'66821', u'COMPLETE')))
(u'4026', ((u'Mary', u'Smith'), (u'3237', u'COMPLETE')))
(u'4026', ((u'Mary', u'Smith'), (u'4847', u'CLOSED')))

如果您看到我有两个元组和一个键,我想合并两个元组并将其保留为键和一个元组,如下所示:

(u'10611', (u'Laura', u'Mcgee', u'66821', u'COMPLETE'))
(u'4026', (u'Mary', u'Smith', u'3237', u'COMPLETE'))
(u'4026', (u'Mary', u'Smith', u'4847', u'CLOSED'))

另外,我如何在 saveAsTextFile 之前格式化它,由 Tab 分隔。示例

10611   Laura   Mcgee   66821   COMPLETE
4026    Mary    Smith   3237    COMPLETE
4026    Mary    Smith   4847    CLOSED

我有这样的东西,但不知道如何使用元组访问它:

.map(lambda x: "%s\t%s\t%s\t%s" %(x[0], x[1], x[2], x[3]))

【问题讨论】:

【参考方案1】:

假设您的数据格式一致,您可以使用简单的加法运算符合并您的元组...

>>> weird = (u'10611', ((u'Laura', u'Mcgee'), (u'66821', u'COMPLETE')))
>>> weirdMerged = (weird[0], (weird[1][0]+weird[1][1]))
>>> weirdMerged
(u'10611', (u'Laura', u'Mcgee', u'66821', u'COMPLETE'))

输出到文本应该很简单,但你古怪的结构也让它有点奇怪。你的 lambda 还不错,但你也可以这样做:

>>> print('\t'.join((weirdMerged[0],)+weirdMerged[1]))
10611   Laura   Mcgee   66821   COMPLETE

我不确定这是否更好,但它确实有效。

【讨论】:

这个有效:merge=cust_j_orders.map(lambda x: (x[0],(x[1][0]+x[1][1]))) 说实话,我对 PySpark 并不是很熟悉。我只是假设它是 Python 的超集,所以 Python 语法可以工作。 是的,我使用了你给我的代码并将其转换为 PySpark 并工作...... :) 非常感谢 如果有效,如果您对我的答案进行投票并将其标记为正确答案,我将不胜感激。 我试过了,但显然因为我是新来的,我不会让更改号码,因为我的声誉低于 15。对此感到抱歉:(【参考方案2】:

你也可以使用列表/元组推导来做这个例子:

my_tuple = (u'10611', ((u'Laura', u'Mcgee'), (u'66821', u'COMPLETE')))
new_tuple = (my_tuple[0], tuple(j for k in my_tuple[1] for j in k))

输出:

print(new_tuple)
>>> ('10611', ('Laura', 'Mcgee', '66821', 'COMPLETE'))

然后,要格式化您的输出,您也可以执行以下操作:

print("0\t1" .format(new_tuple[0], "\t".join(k for k in new_tuple[1])))

输出:

>>> 10611   Laura    Mcgee   66821   COMPLETE

【讨论】:

欢迎。如果我的回答解决了你的问题,你可以接受。 我无法使用它,由于某种原因,当我将其转换为 pyspark 时,输出为 '1\t0' fmt=cust_j_orders.map(lambda (new_tuple, k): "0\t1" .format(new_tuple[0], "\t".join(k for k in new_tuple[1]))) 这个“tuple(j for k in my_tuple[1] for j in k)”可能比我做的简单添加更通用。但对我来说,这是令人困惑的 PERL 级别。尽管它应该是更优雅、更易读的语言,但与更现代的语言相比,Python 确实存在一些缺陷。

以上是关于Spark in Python Working with Tuples - 如何在加入两个 RDD 后合并两个元组的主要内容,如果未能解决你的问题,请参考以下文章

Working with Excel Files in Python

成都W酒店 | GLOW IN SPARK 夜光燎媛主题派对

Working with Deployment Configurations in CodeDeploy

ANN in spark MLLib

Working with LOBs in Oracle and PHP

Ubuntu – Copy/paste and drag&drop not working in VMware machine with Ubuntu