将 RDD 中的字段添加到其他 RDD

Posted

技术标签:

【中文标题】将 RDD 中的字段添加到其他 RDD【英文标题】:Add field from RDD to other RDD 【发布时间】:2017-10-29 15:20:01 【问题描述】:

我正在使用 Pyspark,并且必须使用 RDD(不是数据帧)来执行以下操作:

我有两个 RDD,rdd1,包含 100 多个带名称的字段和 rdd2,包含一个名为“city”的字段。 rdd1 和 rdd2 具有相同的行数(相同的长度)。

rdd1 是这样的:

Row(name="Jack", age=35, state="California", ...)  
Row(name"Jane", age=29, state="Florida", ...)  
...  

rdd2 是这样的:

Row(city="LA")  
Row(city="Miami")  
...

我希望 rdd1 变成:

Row(name="Jack", age=35, state="California", ..., city="LA")  
...

我尝试过的一切都失败了。有什么建议吗?

【问题讨论】:

【参考方案1】:

使用可用于 rdds 的 zip 方法。

rdd_zip = rdd1.zip(rdd2) 

#Flatten the rdd
rdd_final = rdd_zip.map(lambda x: tuple(list(x[0]) + [x[1]]))

【讨论】:

谢谢解答,第一行压缩了rdd,但是第二行不行! @JackHoe 最后一行出现什么错误? 最后一行似乎有效,但是当我调用 rdd_final.take(3) 进行查看时,它只是抛出了这个错误: Py4JJavaError: An error occurred while calling z:org.apache.spark .api.python.PythonRDD.runJob。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 26.0 中的任务 0 失败 1 次,最近一次失败:阶段 26.0 中丢失任务 0.0(TID 45,本地主机,执行程序驱动程序):org.apache.spark .api.python.PythonException:回溯(最近一次调用最后一次):文件“D:\opt\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\wo ... TypeError: “int”对象不可迭代 @JackHoe 试试rdd_zip.map(lambda x: [elem for elem in x])。如果不行可以打印几行rdd_zip检查一下? 现在它可以工作了,但结果并不完全是我正在寻找的 RDD_ZIP IS LIKE : [(Row(name="Jack", age=35, ...), "LA") , (Row....)] ########### RDD_FINAL IS LIKE : [[Row(name="Jack",age=35,..), "LA"], [Row. ..]] ########### 我喜欢 [(Row(name="Jack,age=35,city="LA"), (Row...)]

以上是关于将 RDD 中的字段添加到其他 RDD的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行

修改 Spark RDD foreach 中的集合

将每个 RDD 值与 Scala 中 RDD 中的所有其他值配对

将列添加到 RDD Spark 1.2.1

Spark RDD案例:计算总成绩

如果 RDD 变大,Spark 将如何反应?