在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?
Posted
技术标签:
【中文标题】在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?【英文标题】:Does Spark have to finish processing all entries in the reduceByKey step before starting the join step that depends on the results of the reduceByKey?在开始取决于reduceByKey结果的连接步骤之前,Spark是否必须完成reduceByKey步骤中的所有条目的处理? 【发布时间】:2016-07-06 19:44:19 【问题描述】:问题: Spark 是否必须在开始 join 步骤之前完成 reduceByKey 步骤中的所有条目的处理?
我认为答案是否定的。我认为每个分区/任务必须在继续加入之前完成 reduceByKey 任务。
详情: 在下面的示例中,我通过 key userId 减少了一个 RDD,并将具有相同用户 id 的所有值嵌套到一个列表中。
然后我将 (userid,listOfEvents) 的这个 RDD 与 (userid, otherEvent) 的另一个 RDD 连接起来。
请注意,在本例中,reduceByKey 和 join 之间的分区器是相同的(userId 上的默认 HashParitioner),那么这是否会改变 reduceByKey 是否必须在 join 之前完全处理完所有数据?
在本例中,List(eventA, eventB) 与 Event K 而没有 eventC 的情况永远不会发生,对吗?
但是,有可能 List(eventA, eventB, eventC) 加入了 Event K,但 EventD 和 Event F 还没有减少,可能会发生吗?
Impression Events
userId Event
1 eventA
1 eventB
1 eventC
2 eventD
2 eventF
Conversion Events
userId Event
1 eventK
2 eventL
// The Reduce Step
final JavaPairRDD<Long, ObjectArrayList<Event>> impressionRDD = loadImpressionEvents()
.mapToPair(event ->
final ObjectArrayList<Event> list = new ObjectArrayList();
list.add(new Event(event.getTimestamp(),
event.getCampaignIdentifier(), event.getSiteIdentifier()));
return new Tuple2<>(
event.getUserId(),
list
);
)
.reduceByKey((event1, event2) ->
// Combine impression events with the same user id
event1.addAll(event2);
return event1;
);
// The Join Step
final JavaPairRDD<Long, Tuple2<ConversionEvent, Event>> conversionImpressions = loadConversionEvents()
.mapToPair(event -> new Tuple2<>(
event.getUserId(),
event
))
.leftOuterJoin(impressionRDD);
【问题讨论】:
【参考方案1】:Spark 必须在 cogroup 和 flatten 开始之前完成 shuffle,因此在 reduceByKey 正在进行时无法开始 join。
【讨论】:
以上是关于在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?的主要内容,如果未能解决你的问题,请参考以下文章
Spark中reduceByKey()和groupByKey()的区别