SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)

Posted

技术标签:

【中文标题】SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)【英文标题】:SlidingWindows for slow data (big intervals) on Apache Beam 【发布时间】:2018-11-07 18:15:49 【问题描述】:

我正在使用 Chicago Traffic Tracker 数据集,其中每 15 分钟发布一次新数据。当有新数据可用时,它代表距离“实时”(example,查找_last_updt)10-15 分钟的记录。

例如,在 00:20,我得到时间戳为 00:10 的数据;在 00:35,我从 00:20 开始;在 00:50,我从 00:40 开始。因此,我可以“固定”获取新数据的间隔(每 15 分钟一次),尽管时间戳的间隔略有变化。

我正在尝试在 Dataflow (Apache Beam) 上使用这些数据,为此我正在使用滑动窗口。我的想法是收集和处理 4 个连续的数据点(4 x 15 分钟 = 60 分钟),理想情况下,一旦有新的数据点可用,就更新我对总和/平均值的计算。为此,我从代码开始:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.ZERO)
        .accumulatingFiredPanes());

不幸的是,当我从我的输入中接收到一个新的数据点时,我没有从我所追求的GroupByKey 获得新的(更新的)结果。

我的 SlidingWindows 有问题吗?还是我错过了什么?

【问题讨论】:

您的意思是在第一个元素之后没有得到任何元素,还是在第一次触发后没有得到添加到窗口的后期元素?如果是后者,那么很可能是allowedLateness(Duration.ZERO)引起的,这会丢弃所有的后期元素。 嗨@Anton,我在第一次发射后没有得到迟到的元素,即使这些元素应该在同一个“窗口”上。例如,在 01:14 到达的元素应该包含在从 00:15 开始的窗口中,但事实并非如此。我对allowedLateness 的理解是,将其设置为大于 0(比如说 5 分钟),将允许包含在预计关闭窗口之后到达的元素(因此,如果 01:14 的元素恰好在 01 到达:18,它仍将包含在 01:15 关闭的窗口中)。如果我的理解有误,请告诉我。 【参考方案1】:

一个问题可能是水印超出了窗口的末尾,并丢弃了所有后面的元素。您可以尝试在水印通过后几分钟:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                        .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.standardMinutes(15))
        .accumulatingFiredPanes());

让我知道这是否有帮助。

【讨论】:

我预计这个“水印后几分钟”会出现在我已经包含的 withAllowedLateness 上。如果不是这种情况,您能否解释这两个之间有什么区别?谢谢! 在您的代码中,withAllowedLateness 收到了Duration.ZERO,这意味着您的管道将忽略任何后期元素。您可以传递大于零的持续时间,让您的管道在该窗口中等待更多元素的时间更长。 LMK 如果有帮助的话。 好吧,愚蠢的问题:如果您查看example,您会看到_last_updt 字段。我在此字段上使用withTimestampAttribute 导入数据。因此,如果时间为“00:15”的数据点在“00:30”到达我的系统,我是否需要withAllowedLateness 来解决这个问题? IE。考虑到我的数据延迟,我是否应该将所有数据都视为每次都延迟? 这不是一个愚蠢的问题,但您能详细说明一下您是如何完成withTimestampAttribute 的吗?在 Apache Beam 中,给定源是无限的,默认行为是根据接收到的当前时间为每个元素提供时间戳。如果您使用的是 c.outputWithTimestamp(c.element(), last_updt) 之类的东西,则 beam 根据每个元素的关联 事件时间 将元素划分为窗口。所以它不会看你的系统时间 ** 如果您使用类似(手动更新时间戳)c.outputWithTimestamp(c.element(), last_updt)【参考方案2】:

所以@Pablo(根据我的理解)给出了正确的答案。但我有一些建议不适合发表评论。

我想问你是否需要滑动窗口?据我所知,固定窗口可以为您完成这项工作,并且在计算上也更简单。由于您正在使用累积触发的窗格,因此您不需要使用滑动窗口,因为您的下一个 DoFn 函数将已经对累积的窗格进行平均。

至于代码,我对早期和晚期触发逻辑进行了更改。我还建议增加窗口大小。由于您知道数据每 15 分钟出现一次,因此您应该在 15 分钟后关闭窗口,而不是在 15 分钟后关闭。但是您也不想选择最终会与 15 的倍数(如 20)发生冲突的窗口,因为在 60 分钟时您将遇到同样的问题。所以选择一个与 15 互质的数字,例如 19。还允许延迟输入。

    PCollection<TrafficData> trafficData = input        
        .apply("MapIntoFixedWindows", Window.<TrafficData>into(
            FixedWindows.of(Duration.standardMinutes(19)) 
                        .triggering(AfterWatermark.pastEndOfWindow()
                            // fire the moment you see an element 
                            .withEarlyFirings(AfterPane.elementCountAtLeast(1))
                            //this line is optional since you already have a past end of window and a early firing. But just in case 
                            .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
                        .withAllowedLateness(Duration.standardMinutes(60))
                        .accumulatingFiredPanes());

如果这能解决您的问题,请告诉我!

编辑

所以,我无法理解您是如何计算上述示例的,所以我使用的是通用示例。下面是一个通用的平均函数:

public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> 
  public static class Accum 
    int sum = 0;
    int count = 0;
  

  @Override
  public Accum createAccumulator()  return new Accum(); 

  @Override
  public Accum addInput(Accum accum, Integer input) 
      accum.sum += input;
      accum.count++;
      return accum;
  

  @Override
  public Accum mergeAccumulators(Iterable<Accum> accums) 
    Accum merged = createAccumulator();
    for (Accum accum : accums) 
      merged.sum += accum.sum;
      merged.count += accum.count;
    
    return merged;
  

  @Override
  public Double extractOutput(Accum accum) 
    return ((double) accum.sum) / accum.count;
  

为了运行它,您需要添加以下行:

PCollection<Double> average = trafficData.apply(Combine.globally(new AverageFn()));

由于您目前使用累积触发触发器,这将是解决解决方案的最简单的编码方式。

但是,如果您想使用丢弃火窗格窗口,则需要使用 PCollectionView 来存储上一个平均值并将其作为侧面输入传递给下一个平均值,以便跟踪值。这在编码中稍微复杂一些,但肯定会提高性能,因为每个窗口都会进行持续的工作,这与累积触发不同。

这对您生成自己的函数来丢弃火窗格窗口是否足够有意义?

【讨论】:

嗨@Haris,感谢您的解释。如果我使用大小为 19 的固定窗口,每个窗口将只有 1 个元素,不是吗?我不明白这将如何为我的数据提供“最后一小时的移动平均线”。 Tbh 我想知道我是否应该使用discardingFiredPanes 而不是为那个逻辑积累...... 使用 discardingFiredPanes 的计算成本会更低,而且一个好的长期决策位需要您重新构建移动平均线的逻辑。 您能给我举个例子来说明您的用例中的移动平均线吗?或者我可以给你一个通用函数作为实现移动平均线的例子。 如果您使用 19 分钟,平均而言,您的列表中将有 1 个元素,但您的窗口中最多将有 2 个元素。 1/5 的时间您将拥有两个元素,而 4/5 的时间您将在窗口中拥有一个元素。如果你愿意,我可以解释这个逻辑。 我计算移动平均线的一个例子:link。基本上我的平均值将基于最后 4 或 5 个值的总和(除以 4 或 5)。我说“4 或 5”是因为有时我在 1 小时内得到 4 个不同的值,有时我得到 5。所以我不能将窗口设置为在 X 个元素后触发,而是我需要根据元素时间来做。

以上是关于SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 3.2 内置支持会话窗口

Session Windows

PHP curl 不适用于 Windows 和 Apache

.htaccess for apache 2.2 不适用于 apache 2.4 vagrant 开发框

Apache 虚拟主机不适用于子域

Apache 2.4 - 简单的反向代理 - 不适用于多个条目