如何针对特定用例在 reduceByKeyAndWindow() 中实现 invFunc

Posted

技术标签:

【中文标题】如何针对特定用例在 reduceByKeyAndWindow() 中实现 invFunc【英文标题】:How to implement invFunc in reduceByKeyAndWindow() for specific use case 【发布时间】:2015-10-18 11:57:54 【问题描述】:

我正在使用火花流来处理文件流。多个文件成批到达并从所有文件中激发过程数据。 我的用途是获取进入后续批次的文件的每条记录的总和。例如:

key: key_1 value: 10 --> batch1 key: key_1 value: 05 --> batch1 key: key_1 value: 19 --> batch2 key: key_1 value: 11 --> batch3 key: key_1 value: 10 --> batch4

我需要如下输出:

处理第一批后,我需要输出为 => key: key_1 val: 15 处理第二批后,我需要输出为 => key: key_1 val: 34 处理第三批后,我需要输出为 => key: key_1 val: 45 处理第 4 批后,我需要输出为 => key: key_1 val: 55 处理第 5 批后,我需要输出为 => key: key_1 val: 55

我的 reduceByKeyAndWindow() 代码如下:

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));

private static final Function2<Summary, Summary, Summary> GET_GRP_SUM = new Function2<Summary, Summary, Summary>() 
    private static final long serialVersionUID = 1L;

    public Summary call(Summary s1, Summary s2) throws Exception 
        try 

            Summary s = new Summary();

            long grpCnt = s1.getDelta() + s2.getDelta();
            s.setDeltaSum(grpCnt);

            return s;
         catch (Exception e) 
            logger.error(" ==== error in CKT_GRP_SUM ==== :"+e);
            return new Summary();
        
    

;

我从上面的实现中得到的输出如下:

处理第一批后,我得到输出 => key: key_1 value: 15 处理第二批后,我得到输出 => key: key_1 value: 34 处理第三批后,我得到输出 => key: key_1 value: 30 处理第 4 批后,我得到输出 => key: key_1 value: 21 处理第 5 批后,我得到输出 => key: key_1 value: 10

根据 reduceByKeyAndWindow() 的输出,它似乎正在计算先前批次数据和当前批次数据的聚合。 但我的要求是对上一批的聚合数据和当前的批数据进行聚合。这样根据上面的例子 它应该在第 4 和第 5 批结束时输出为 [(((15)+19)+11)+10 = 55]。

我读到了 reduceByKeyAndWindow()invFunc 可以实现以获得预期的输出。我试图实现它类似于 GET_GRP_SUM 但它没有给我预期的结果。任何有关正确实施以获得所需输出的帮助将不胜感激。

我正在使用 java 1.8.45 和 spark 版本 1.4.1 和 hadoop 版本 2.7.1。

我用 reduceByKeyAndWindow() 对 invFunc 的实现

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));

private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() 
    private static final long serialVersionUID = 1L;

    public Summary call(Summary s1, Summary s2) throws Exception 
        try 

            Summary s = new Summary();

            long grpCnt = s1.getDelta() + s2.getDelta();
            s.setDeltaSum(grpCnt);

            return s;

         catch (Exception e) 
            logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
            return new Summary();
        
    
;

我已经像上面那样实现了我的 invFunc,这并没有给我预期的输出。我这里分析的是 s1 和 s2 给我之前批次的聚合值,我觉得我不太确定。

我尝试更改我的 invFunc 实现,如下所示:

private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() 
    private static final long serialVersionUID = 1L;

    public Summary call(Summary s1, Summary s2) throws Exception 
        try 

            return s1;

         catch (Exception e) 
            logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
            return new Summary();
        
    
;

这个实现给了我预期的输出。但我面临的问题是带有 invFunc 的 reduceByKeyAndWindow() 不会自动删除旧键。我又看了几篇文章,发现我需要编写自己的过滤器函数,该函数将删除具有 0 值(无值)的旧键。

再次,我不确定如何编写过滤函数来删除具有 0 值(无值)的旧键,因为我没有具体了解 s1 和 s2 返回到 INV_GET_GRP_SUM 的内容。

【问题讨论】:

【参考方案1】:

使用 UpdateStateByKey

您是否已从 Streaming API 签出 updateStateByKey()?它允许您在批处理间隔之间维护键值对的状态,并使用与之关联的新信息(值)不断更新每个键。这适用于您的用例,因为之前的数据状态将包含每个键的聚合总和,直到最新状态。有关此功能的更多信息,请参阅其用法here 和示例here。

关于该函数的一个注意事项是,它需要启用检查点,以便可以在每次迭代时保存状态。

(编辑:)

使用 ReduceByKeyAndWindow

关于使用reduceKeyAndWindow(),您的普通funcinvFunccall() 方法的第二个参数是添加的新元素和旧元素分别减去的元素。本质上,您是通过在新时间片上添加元素(您正在使用GET_GRP_SUM)并从旧时间片中减去元素(您没有使用INV_GET_GRP_SUM)来实现此窗口化缩减。请注意,在您的第一次尝试中,您将旧值重新添加回当前窗口中的值,而在第二次尝试中,您将忽略移出窗口的值。

要从移出窗口的元素中减去旧值,您可能希望 INV_GET_GRP_SUM 具有类似于以下的逻辑(并且可以找到类似的正确实现 here):

public Summary call(Summary s1, Summary s2) throws Exception 
    try 

        long grpCnt = s1.getDelta() - s2.getDelta();
        s.setDeltaSum(grpCnt);

     catch (Exception e) 
        logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
        return new Summary();
    

对于您的另一个问题,似乎确实有一种方法可以过滤掉过期的密钥,并且正如您所提到的,它确实涉及编写过滤器函数。正如您从API 中看到的那样,此过滤器函数接受您的键值对并返回一个布尔值,该值将设置为 true(如果您想保留该对)或 false(如果您想删除该对) .在这种情况下,由于您想在值为零时删除您的配对,您可以执行以下操作:

private static final Function<scala.Tuple2<String, Summary>, Boolean> FILTER_EXPIRED = new Function<scala.Tuple2<String, Summary>, Boolean>() 
    public Boolean call(scala.Tuple2<String, Summary> s)  
        return s.productElement(1) > 0; 
    

然后你可以把它传递给你的reduceByKeyAndWindow()函数(注意你应该在这里传入分区参数来确定你的DStream中的RDD将使用多少个分区):

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval), partitions, FILTER_EXPIRED);

【讨论】:

感谢 Rohan 的快速回复。我需要提到的一件事是,一批有大约 100K-500K 记录要处理,并且批次上的密钥会随着时间而变化。在这种情况下,如果我使用 updateStateByKey(),存储记录键的缓存将继续增加,而旧记录在 2-3 批次后将永远不会重复。我在问题中解释的示例场景只是为了理解用例。这就是我决定使用 reduceByKeyAndWindow() 的原因,这样 Spark 将通过 2-3 个批次的窗口在上一个批次和当前批次中查找相同的记录(具有相同的键)。 您实际上可以将您的密钥配置为在一段时间后使用updateStateByKey() 删除(如果您没有看到任何新的密钥值)。但是,如果您设置使用 reduceByKeyAndWindow(),您可以添加您尝试使用 invFunc 运行的代码吗? 嗨,Rohan,我已经通过实现 invFunc 编辑了我的问题。请看一下,如果您能帮我过滤旧的键值,请告诉我。 嘿@user3478678,我为您的用例添加了一些信息reduceByKeyAndWindow()。希望对您有所帮助!

以上是关于如何针对特定用例在 reduceByKeyAndWindow() 中实现 invFunc的主要内容,如果未能解决你的问题,请参考以下文章

用于失败测试用例的 J Meter 电子邮件配置

测试用例在完成 beforeAll 之前执行。?

delphi用例在AdvBadgeGlowButton1标题上

丢帧测试用例在 Qual-E 中失败

使用 UI 测试用例在 android studio 中运行测试

为啥我的 Python 测试用例在这个编码挑战中失败了?