如何在 PySpark 中将两个 rdd 合并为一个

Posted

技术标签:

【中文标题】如何在 PySpark 中将两个 rdd 合并为一个【英文标题】:how to concat and combine two rdd into one in PySpark 【发布时间】:2020-08-13 09:18:05 【问题描述】:

我得到两个RDD 并想连接并合并为一个RDD,如下所示:

rdd_1 = ['a1', 'a2', 'a3', 'a4', 'a5', ]
rdd_2 = ['b1', 'b2', 'b3', 'b4', 'b5', ]

# concat and combine these two rdd into one
rdd = ['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

我知道我可以将这两个 RDD 转换为 DataFrame 并将其连接到 spark.sql 中,如下所示:

df = df.withColumn('col1_col2', concat(col('col1'), lit(' '), col('col2')))

但是对于亿级样本来说效率不够。 所以想知道RRD编程有没有更快的方法。

【问题讨论】:

"a1" 和 "b1" 的组合规则是什么?你用“1”吗?将 RDD 想象成一袋大理石。其中没有预定义的顺序。所以列表的第一个元素不会自动与另一个列表的第一个元素结合。你需要定义一个规则。 【参考方案1】:

我想尝试压缩并加入:

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()

或者没有lambda:

rdd_1.zip(rdd_2).map('_'.join).collect()

例子:

rdd_1 = spark.sparkContext.parallelize(['a1', 'a2', 'a3', 'a4', 'a5', ])
rdd_2 = spark.sparkContext.parallelize(['b1', 'b2', 'b3', 'b4', 'b5', ])

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()

['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

【讨论】:

【参考方案2】:

从列表中创建 rdds,然后在两个 rdds 上执行 zip,然后使用 map 和 join 对其进行迭代和连接。

rd1 = sc.parallelize(['a1', 'a2', 'a3', 'a4', 'a5', ])
rd2 = sc.parallelize(['b1', 'b2', 'b3', 'b4', 'b5', ])
rd1.zip(rd2).map(lambda x: x[0]+'_'+x[1]).collect()
rd1.zip(rd2).map(lambda x: '_'.join(x)).collect()
rd1.zip(rd2).map('_'.join).collect()

['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

【讨论】:

以上是关于如何在 PySpark 中将两个 rdd 合并为一个的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中将两个 RDD[string] 合并在一起?

如何在 PySpark 中压缩两个 RDD?

如何在pyspark中将rdd行转换为带有json结构的数据框?

如何在 Pyspark 中将 Pair RDD Tuple 键转换为字符串键? [关闭]

在pySpark中将RDD拆分为n个部分

在 Pyspark 中将流水线 RDD 转换为 Dataframe [重复]