pyspark中的Rdd乘法?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark中的Rdd乘法?相关的知识,希望对你有一定的参考价值。
我有两个数据框,如下面:
数据帧1:(df1)
+---+----------+
|id |features |
+---+----------+
|8 |[5, 4, 5] |
|9 |[4, 5, 2] |
+---+----------+
数据框2:(df2)
+---+----------+
|id |features |
+---+----------+
|1 |[1, 2, 3] |
|2 |[4, 5, 6] |
+---+----------+
之后我已经转换成Df到Rdd
rdd1=df1.rdd
如果我正在做rdd1.collect()
结果如下
[Row(id=8, f=[5, 4, 5]), Row(id=9, f=[4, 5, 2])]
rdd2=df2.rdd
broadcastedrddif = sc.broadcast(rdd2.collectAsMap())
现在,如果我正在做broadcastedrddif.value
{1: [1, 2, 3], 2: [4, 5, 6]}
现在我想做rdd1和broadcastedrddif的乘法和,即它应该返回如下的输出。
((8,[(1,(5*1+2*4+5*3)),(2,(5*4+4*5+5*6))]),(9,[(1,(4*1+5*2+2*3)),(2,(4*4+5*5+2*6)]) ))
所以我的最终输出应该是
((8,[(1,28),(2,70)]),(9,[(1,20),(2,53)]))
其中(1,28)是元组而不是浮点数。
请帮帮我。
答案
我不明白你为什么使用sc.broadcast()
但我还是用它...在这种情况下非常有用mapValues在最后一个RDD上,我使用列表理解来使用字典执行操作。
x1=sc.parallelize([[8,5,4,5], [9,4,5,2]]).map(lambda x: (x[0], (x[1],x[2],x[3])))
x1.collect()
x2=sc.parallelize([[1,1,2,3], [2,4,5,6]]).map(lambda x: (x[0], (x[1],x[2],x[3])))
x2.collect()
#I took immediately an RDD because is more simply to test
broadcastedrddif = sc.broadcast(x2.collectAsMap())
d2=broadcastedrddif.value
def sum_prod(x,y):
c=0
for i in range(0,len(x)):
c+=x[i]*y[i]
return c
x1.mapValues(lambda x: [(i, sum_prod(list(x),list(d2[i]))) for i in [k for k in d2.keys()]]).collect()
Out[19]: [(8, [(1, 28), (2, 70)]), (9, [(1, 20), (2, 53)])]
以上是关于pyspark中的Rdd乘法?的主要内容,如果未能解决你的问题,请参考以下文章
PySpark - ALS 输出中的 RDD 到 DataFrame