Spark groupBy vs repartition plus mapPartitions
Posted
技术标签:
【中文标题】Spark groupBy vs repartition plus mapPartitions【英文标题】: 【发布时间】:2019-01-16 12:07:01 【问题描述】:我的数据集大约有 2000 万行,需要大约 8 GB 的 RAM。我正在使用 2 个执行器运行我的工作,每个执行器 10 GB RAM,每个执行器 2 个内核。由于进一步的转换,数据应该一次被缓存。
我需要根据 4 个字段减少重复项(选择任何重复项)。两个选项:使用groupBy
和使用repartition
和mapPartitions
。第二种方法允许您指定分区数,因此在某些情况下可以更快地执行,对吧?
你能解释一下哪个选项的性能更好吗?两个选项的 RAM 消耗是否相同?
使用groupBy
dataSet
.groupBy(col1, col2, col3, col4)
.agg(
last(col5),
...
last(col17)
);
使用repartition
和mapPartitions
dataSet.sqlContext().createDataFrame(
dataSet
.repartition(parallelism, seq(asList(col1, col2, col3, col4)))
.toJavaRDD()
.mapPartitions(DatasetOps::reduce),
SCHEMA
);
private static Iterator<Row> reduce(Iterator<Row> itr)
Comparator<Row> comparator = (row1, row2) -> Comparator
.comparing((Row r) -> r.getAs(name(col1)))
.thenComparing((Row r) -> r.getAs(name(col2)))
.thenComparingInt((Row r) -> r.getAs(name(col3)))
.thenComparingInt((Row r) -> r.getAs(name(col4)))
.compare(row1, row2);
List<Row> list = StreamSupport
.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.ORDERED), false)
.collect(collectingAndThen(toCollection(() -> new TreeSet<>(comparator)), ArrayList::new));
return list.iterator();
【问题讨论】:
【参考方案1】:第二种方法允许您指定分区数,因此在某些情况下可以更快地执行,对吧?
不是真的。这两种方法都允许您指定分区数 - 在第一种情况下通过 spark.sql.shuffle.partitions
spark.conf.set("spark.sql.shuffle.partitions", parallelism)
但是,如果重复很常见,则第二种方法本质上效率较低,因为它首先洗牌,然后减少,跳过映射端减少(换句话说,它是另一种按键分组)。如果重复很少见,但这不会有太大区别。
附带说明Dataset
已经提供了dropDuplicates
variants,它采用一组列,而first
/ last
在这里没有特别的意义(参见How to select the first row of each group? 中的讨论)。
【讨论】:
哇!不知道dropDuplicates
方法。非常感谢!以上是关于Spark groupBy vs repartition plus mapPartitions的主要内容,如果未能解决你的问题,请参考以下文章
Spark 中的 Rebalance 操作以及与Repartition操作的区别
在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区
Spark中repartition和partitionBy的区别