在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?

Posted

技术标签:

【中文标题】在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?【英文标题】:Does Spark have to finish processing all entries in the reduceByKey step before starting the join step that depends on the results of the reduceByKey?在开始取决于reduceByKey结果的连接步骤之前,Spark是否必须完成reduceByKey步骤中的所有条目的处理? 【发布时间】:2016-07-06 19:44:19 【问题描述】:

问题: Spark 是否必须在开始 join 步骤之前完成 reduceByKey 步骤中的所有条目的处理?

我认为答案是否定的。我认为每个分区/任务必须在继续加入之前完成 reduceByKey 任务。

详情: 在下面的示例中,我通过 key userId 减少了一个 RDD,并将具有相同用户 id 的所有值嵌套到一个列表中。

然后我将 (userid,listOfEvents) 的这个 RDD 与 (userid, otherEvent) 的另一个 RDD 连接起来。

请注意,在本例中,reduceByKey 和 join 之间的分区器是相同的(userId 上的默认 HashParitioner),那么这是否会改变 reduceByKey 是否必须在 join 之前完全处理完所有数据?

在本例中,List(eventA, eventB) 与 Event K 而没有 eventC 的情况永远不会发生,对吗?

但是,有可能 List(eventA, eventB, eventC) 加入了 Event K,但 EventD 和 Event F 还没有减少,可能会发生吗?

Impression Events
userId  Event
1       eventA
1       eventB
1       eventC

2       eventD
2       eventF

Conversion Events
userId  Event
1       eventK

2       eventL
// The Reduce Step
final JavaPairRDD<Long, ObjectArrayList<Event>> impressionRDD = loadImpressionEvents()
    .mapToPair(event -> 

        final ObjectArrayList<Event> list = new ObjectArrayList();

        list.add(new Event(event.getTimestamp(),
            event.getCampaignIdentifier(), event.getSiteIdentifier()));

        return new Tuple2<>(
            event.getUserId(),
            list
        );
    )
    .reduceByKey((event1, event2) -> 
        // Combine impression events with the same user id
        event1.addAll(event2);
        return event1;
    );

// The Join Step 
final JavaPairRDD<Long, Tuple2<ConversionEvent, Event>> conversionImpressions = loadConversionEvents()
    .mapToPair(event -> new Tuple2<>(
        event.getUserId(),
        event
    ))
    .leftOuterJoin(impressionRDD);

【问题讨论】:

【参考方案1】:

Spark 必须在 cogroup 和 flatten 开始之前完成 shuffle,因此在 reduceByKey 正在进行时无法开始 join。

【讨论】:

以上是关于在开始取决于reduceByKey结果的连接步骤之前,Spark是不是必须完成reduceByKey步骤中的所有条目的处理?的主要内容,如果未能解决你的问题,请参考以下文章

reduceByKey和groupByKey的区别

设计模式之模板模式

随笔-学期之始

Spark中reduceByKey()和groupByKey()的区别

Spark(pyspark)如何仅在3元素元组的2个元素上reduceByKey

在 Pentaho Kettle 中的步骤初始化之前测试数据库连接?