pyspark中的Rdd乘法?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark中的Rdd乘法?相关的知识,希望对你有一定的参考价值。

我有两个数据框,如下面:

数据帧1:(df1)

+---+----------+
|id |features  |
+---+----------+
|8  |[5, 4, 5] |
|9  |[4, 5, 2] |
+---+----------+

数据框2:(df2)

+---+----------+
|id |features  |
+---+----------+
|1  |[1, 2, 3] |
|2  |[4, 5, 6] |
+---+----------+

之后我已经转换成Df到Rdd

rdd1=df1.rdd

如果我正在做rdd1.collect()结果如下

[Row(id=8, f=[5, 4, 5]), Row(id=9, f=[4, 5, 2])]

rdd2=df2.rdd

broadcastedrddif = sc.broadcast(rdd2.collectAsMap())

现在,如果我正在做broadcastedrddif.value

{1: [1, 2, 3], 2: [4, 5, 6]}

现在我想做rdd1和broadcastedrddif的乘法和,即它应该返回如下的输出。

((8,[(1,(5*1+2*4+5*3)),(2,(5*4+4*5+5*6))]),(9,[(1,(4*1+5*2+2*3)),(2,(4*4+5*5+2*6)]) ))

所以我的最终输出应该是

((8,[(1,28),(2,70)]),(9,[(1,20),(2,53)]))

其中(1,28)是元组而不是浮点数。

请帮帮我。

答案

我不明白你为什么使用sc.broadcast()但我还是用它...在这种情况下非常有用mapValues在最后一个RDD上,我使用列表理解来使用字典执行操作。

x1=sc.parallelize([[8,5,4,5], [9,4,5,2]]).map(lambda x: (x[0], (x[1],x[2],x[3])))
x1.collect()
x2=sc.parallelize([[1,1,2,3], [2,4,5,6]]).map(lambda x: (x[0], (x[1],x[2],x[3])))
x2.collect()
#I took immediately an RDD because is more simply to test
broadcastedrddif = sc.broadcast(x2.collectAsMap())
d2=broadcastedrddif.value

def sum_prod(x,y):
    c=0
    for i in range(0,len(x)):
        c+=x[i]*y[i]
    return c
x1.mapValues(lambda x: [(i, sum_prod(list(x),list(d2[i]))) for i in [k for k in d2.keys()]]).collect()
Out[19]: [(8, [(1, 28), (2, 70)]), (9, [(1, 20), (2, 53)])]

以上是关于pyspark中的Rdd乘法?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - ALS 输出中的 RDD 到 DataFrame

删除 RDD、Pyspark 中的停用词

pyspark中的RDD到DataFrame(来自rdd的第一个元素的列)

将 RDD 行拆分到 Pyspark 中的不同列

了解 PySpark 中的 RDD(来自并行化)

从 Pyspark 中的 RDD 中提取字典