如何将列表的 RDD 转换为压缩列表的 RDD?

Posted

技术标签:

【中文标题】如何将列表的 RDD 转换为压缩列表的 RDD?【英文标题】:How do transform the RDD of lists to a RDD of zipped list? 【发布时间】:2019-05-09 06:38:25 【问题描述】:

RDD ( 列表(1, 2, 3) 列表('A','B','C') 列表('a','b','c') )

我想把它改成

RDD ( 列表(1,'A','a') 列表(2,'B','b') 列表(3,'C','c') )

我想在 PySpark 中执行此操作而不使用收集操作?

我尝试了以下方法:

    lst = [[1, 2, 3], ['A', 'B', 'C'], ['a', 'b', 'c']]
    l = sc.parallelize(lst)
    lst_new = l.reduce(lambda x,y: zip(x, y))
    for i in lst_new:
        print(i)
    
output: 
((1, 'A'), 'aa')
((2, 'B'), 'bb')
((3, 'C'), 'cc')
Required output: RDD(List(1, 'A', 'a'), List(2, 'B', 'b'), List(3, 'C', 'c'))

这样我就可以将其转换为数据框。

+--+---+---+
|A1| A2| A3|
+--+---+---+
|1 |  A| aa|
|2 |  B| bb|
|3 |  C| cc|
+--+---+---+

【问题讨论】:

第二个示例的输出似乎是错误的。我通过运行第二个示例得到(1, ('A', 'a')) (2, ('B', 'b')) (3, ('C', 'c')) 【参考方案1】:

RDD 适用于 (key, value) 对。当你 zip first RDDsecond RDD 然后 values from first RDD becomes keys for new RDDvalues from the second RDD becomes values for new RDD

现在通过示例 1 理解 -

创建 RDDS

#Python Lists
a = [1, 2, 3]
b = ['A', 'B', 'C']
c = ['a','b', 'c']

#3 Different RDDS from Python Lists
rdda = sc.parallelize(a)
rddb = sc.parallelize(b)
rddc = sc.parallelize(c)

一个接一个地压缩并检查key, value对-

d = rdda.zip(rddb)
print (d.take(1))
[(1, 'A')] # 1 is key here and 'A' is Value

d = d.zip(rddc)
print (d.take(1))
[((1, 'A'), 'a')] # (1, 'A') is key here and 'a' is Value

print (d.collect()) #This wouldn't give us desired output
[((1, 'A'), 'a'), ((2, 'B'), 'b'), ((3, 'C'), 'c')]

#To get the desired output we need to map key and values in the same object/tuple using map

print (d.map(lambda x:x[0]+(x[1], )).take(1))
[(1, 'A', 'a')]

#lambda x:x[0]+(x[1], )  Here x[0] is having tuple of keys (1, 'A') and x[1] is just a string value 'a'. Now concat key tuple and value (convert to tuple (x[1], )) 

终于转换成DF

d.map(lambda x:x[0]+(x[1], )).toDF().show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  A|  a|
|  2|  B|  b|
|  3|  C|  c|
+---+---+---+

希望这将帮助您解决第二个示例。

【讨论】:

以上是关于如何将列表的 RDD 转换为压缩列表的 RDD?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 list 应该先转换为 RDD 再转换为 Dataframe?有啥方法可以将列表转换为数据框?

如何在 Python 中解压缩 RDD 中每个项目的值(列表)?

将RDD的每一行中的键值对列表转换为每行中的单个键值

如何对 spark scala RDD 中的元组列表/数组执行转换?

Pyspark RDD 收集前 163 行

Pyspark:从列表的 RDD 创建一个火花数据框,其中列表的某些元素是对象