多个 RDD 的 Spark union
Posted
技术标签:
【中文标题】多个 RDD 的 Spark union【英文标题】:Spark union of multiple RDDs 【发布时间】:2016-02-18 01:24:56 【问题描述】:在我的猪代码中,我这样做:
all_combined = Union relation1, relation2,
relation3, relation4, relation5, relation 6.
我想对 spark 做同样的事情。然而,不幸的是,我看到我必须继续成对地做:
first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
是否有联合运算符可以让我一次对多个 rdd 进行操作:
例如union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)
这是一个方便的问题。
【问题讨论】:
【参考方案1】:不幸的是,这是在 Spark 中访问 UNION
表的唯一方法。然而,而不是
first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
...
您可以像这样以更简洁的方式执行它:
result = rdd1.union(rdd2).union(rdd3).union(rdd4)
【讨论】:
在 spark 中联合表的方法不止一种。此评论不正确。见上面 zero323 的评论【参考方案2】:如果这些是 RDD,你可以使用SparkContext.union
方法:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = sc.parallelize([7, 8, 9])
rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()
## [1, 2, 3, 4, 5, 6, 7, 8, 9]
没有DataFrame
等价物,但它只是一个简单的单线问题:
from functools import reduce # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)
df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))
unionAll(df1, df2, df3).show()
## +---+----+
## | k| v|
## +---+----+
## | 1|foo1|
## | 2|bar1|
## | 3|foo2|
## | 4|bar2|
## | 5|foo3|
## | 6|bar3|
## +---+----+
如果DataFrames
的数量很大,则在RDD 上使用SparkContext.union
并重新创建DataFrame
可能是避免issues related to the cost of preparing an execution plan 的更好选择:
def unionAll(*dfs):
first, *_ = dfs # Python 3.x, for 2.x you'll have to unpack manually
return first.sql_ctx.createDataFrame(
first.sql_ctx._sc.union([df.rdd for df in dfs]),
first.schema
)
【讨论】:
这里 *rest 的目的是什么?它不在任何地方使用。 我想在单行 DF 之间执行大约 3000 个联合。使用第一个选项,在第 100 次迭代后它会以指数速度变慢(我正在使用 tqdm 进行测试)。使用第二个选项,它从一开始就很慢,并且一直线性减速。有没有更好的方法来做到这一点? @drkostas 可能不是最好的方法,但我通过保存 RDD 然后加载它并继续循环来解决这个问题。这会杀死 RDD 的历史记录,因为它会在每个新循环之前重新运行 RDDs 历史记录中的每个循环。 Spark 不喜欢循环 @Gramatik 是的,我也以同样的方式解决了问题。通过使用选项append
将每个数据帧保存在 parquet 中,然后将 parquet 加载到新的数据帧中。【参考方案3】:
您还可以在 RDD 之间为 UNION 使用加法
rdd = sc.parallelize([1, 1, 2, 3])
(rdd + rdd).collect()
## [1, 1, 2, 3, 1, 1, 2, 3]
【讨论】:
以上是关于多个 RDD 的 Spark union的主要内容,如果未能解决你的问题,请参考以下文章