如何将三个 RDD 加入一个元组?

Posted

技术标签:

【中文标题】如何将三个 RDD 加入一个元组?【英文标题】:How to join three RDDs in to a tuple? 【发布时间】:2015-07-15 16:52:44 【问题描述】:

我对 Python 中的 Apache Spark 比较陌生,这就是我想要做的。我输入的数据如下。

rdd_row 是行索引 (i) 的 RDD, rdd_col 是列索引 (j) 的 RDD, rdd_values 是值 (v) 的 RDD。

以上三个RDD都很大。

我正在尝试将它们转换为稀疏 rdd 矩阵

rdd_mat= ([rdd_row],[rdd_col],[rdd_values])

即,

rdd_mat=([i1,i2,i3 ..],[j1,j2,j3..], [v1,v2,v3 ..])

我试过了:

zip where rdd_row.zip(rdd_col).zip(rdd_val) 

但它最终给了

[(i1,j1,v1),(i2,j2,v2) ..]

rdd1.union(rdd2) 

不会创建元组。

非常感谢帮助我朝着正确的方向前进!

【问题讨论】:

您尝试做的可能不是捕捉 RDD 的好处。为什么你想要一个包含 3 个并行列表的 RDD? 谢谢院长。我的输入分为 3 个列表 - 行、列和值。我的印象是,一个包含 3 个列表的 RDD 将使我能够以更简单的方式执行矩阵运算。 【参考方案1】:

不幸的是,目前(Spark 1.4)如果您对线性代数感兴趣,Scala 和 Java 比 Python 更好。假设您输入如下:

import numpy as np
np.random.seed(323) 

rdd_row = sc.parallelize([0, 1, 1, 2, 3])
rdd_col = sc.parallelize([1, 2, 3, 4, 4])
rdd_vals = sc.parallelize(np.random.uniform(0, 1, size=5))

要获得所需形状的rdd_mat,您可以执行以下操作:

assert rdd_row.count() == rdd_col.count() == rdd_vals.count()
rdd_mat = sc.parallelize(
    (rdd_row.collect(), rdd_row.collect(), rdd_vals.collect()))

但这是一个相当糟糕的主意。正如@DeanLa 已经提到的,这里的并行处理非常有限,更不用说每个部分(例如整个行列表)都将在单个分区/节点上结束。

在不知道你想如何使用输出的情况下,很难给你一个有意义的建议,但一种方法是使用如下内容:

from pyspark.mllib.linalg import Vectors

coords = (rdd_row.
    zip(rdd_col).
    zip(rdd_vals).
    map(lambda ((row, col), val): (row, col, val)).
    cache())

ncol = coords.map(lambda (row, col, val): col).distinct().count()

rows = (coords.
    groupBy(lambda (row, col, val): row)
    .mapValues(lambda values: Vectors.sparse(
        ncol, sorted((col, val) for (row, col, val) in values))))

它将创建一个 rdd 对,表示给定行的行索引和值的稀疏向量。如果您添加一些连接或按列添加组,您可以自己实现一些典型的线性代数例程,但是对于功能齐全的分布式数据结构,最好使用 Scala / Java CoordinateMatrix 或 org.apache.spark.mllib.linalg.distributed 中的其他类

【讨论】:

感谢 zero323。这正是我所做的,我一直在遇到问题。使用 .collect() 对巨大的 rdds 效率不高。我一直在使用 zip 和 lambda 函数来解决问题,但我一直在想一定有更好的方法。正如您所建议的那样,Java CoordinateMatrix 可能适用于我的情况。再次感谢。 收集绝对是个问题,但结构本身要糟糕得多。使用groupByunion 的组合可以获得相同的效果,但实际上它根本不会更好。具有少量大元素的所需结构根本不适合 Spark。

以上是关于如何将三个 RDD 加入一个元组?的主要内容,如果未能解决你的问题,请参考以下文章

Spark 2.0:如何将元组的 RDD 转换为 DF [重复]

如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行

如何对 spark scala RDD 中的元组列表/数组执行转换?

如何将 PySpark / AWS Glue 中 RDD 的所有行加入/连接/合并成一条长线?

将每个元素视为元组时,在 PySpark 中加入 2 个 RDD

如何将地图转换为 Spark 的 RDD