SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)
Posted
技术标签:
【中文标题】SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)【英文标题】:SlidingWindows for slow data (big intervals) on Apache Beam 【发布时间】:2018-11-07 18:15:49 【问题描述】:我正在使用 Chicago Traffic Tracker 数据集,其中每 15 分钟发布一次新数据。当有新数据可用时,它代表距离“实时”(example,查找_last_updt
)10-15 分钟的记录。
例如,在 00:20,我得到时间戳为 00:10 的数据;在 00:35,我从 00:20 开始;在 00:50,我从 00:40 开始。因此,我可以“固定”获取新数据的间隔(每 15 分钟一次),尽管时间戳的间隔略有变化。
我正在尝试在 Dataflow (Apache Beam) 上使用这些数据,为此我正在使用滑动窗口。我的想法是收集和处理 4 个连续的数据点(4 x 15 分钟 = 60 分钟),理想情况下,一旦有新的数据点可用,就更新我对总和/平均值的计算。为此,我从代码开始:
PCollection<TrafficData> trafficData = input
.apply("MapIntoSlidingWindows", Window.<TrafficData>into(
SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
.every(Duration.standardMinutes(15))) . // interval to get new data
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
不幸的是,当我从我的输入中接收到一个新的数据点时,我没有从我所追求的GroupByKey
获得新的(更新的)结果。
我的 SlidingWindows 有问题吗?还是我错过了什么?
【问题讨论】:
您的意思是在第一个元素之后没有得到任何元素,还是在第一次触发后没有得到添加到窗口的后期元素?如果是后者,那么很可能是allowedLateness(Duration.ZERO)
引起的,这会丢弃所有的后期元素。
嗨@Anton,我在第一次发射后没有得到迟到的元素,即使这些元素应该在同一个“窗口”上。例如,在 01:14 到达的元素应该包含在从 00:15 开始的窗口中,但事实并非如此。我对allowedLateness
的理解是,将其设置为大于 0(比如说 5 分钟),将允许包含在预计关闭窗口之后到达的元素(因此,如果 01:14 的元素恰好在 01 到达:18,它仍将包含在 01:15 关闭的窗口中)。如果我的理解有误,请告诉我。
【参考方案1】:
一个问题可能是水印超出了窗口的末尾,并丢弃了所有后面的元素。您可以尝试在水印通过后几分钟:
PCollection<TrafficData> trafficData = input
.apply("MapIntoSlidingWindows", Window.<TrafficData>into(
SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
.every(Duration.standardMinutes(15))) . // interval to get new data
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.standardMinutes(15))
.accumulatingFiredPanes());
让我知道这是否有帮助。
【讨论】:
我预计这个“水印后几分钟”会出现在我已经包含的 withAllowedLateness 上。如果不是这种情况,您能否解释这两个之间有什么区别?谢谢! 在您的代码中,withAllowedLateness
收到了Duration.ZERO
,这意味着您的管道将忽略任何后期元素。您可以传递大于零的持续时间,让您的管道在该窗口中等待更多元素的时间更长。 LMK 如果有帮助的话。
好吧,愚蠢的问题:如果您查看example,您会看到_last_updt
字段。我在此字段上使用withTimestampAttribute
导入数据。因此,如果时间为“00:15”的数据点在“00:30”到达我的系统,我是否需要withAllowedLateness
来解决这个问题? IE。考虑到我的数据延迟,我是否应该将所有数据都视为每次都延迟?
这不是一个愚蠢的问题,但您能详细说明一下您是如何完成withTimestampAttribute
的吗?在 Apache Beam 中,给定源是无限的,默认行为是根据接收到的当前时间为每个元素提供时间戳。如果您使用的是 c.outputWithTimestamp(c.element(), last_updt)
之类的东西,则 beam 根据每个元素的关联 事件时间 将元素划分为窗口。所以它不会看你的系统时间
** 如果您使用类似(手动更新时间戳)c.outputWithTimestamp(c.element(), last_updt)
【参考方案2】:
所以@Pablo(根据我的理解)给出了正确的答案。但我有一些建议不适合发表评论。
我想问你是否需要滑动窗口?据我所知,固定窗口可以为您完成这项工作,并且在计算上也更简单。由于您正在使用累积触发的窗格,因此您不需要使用滑动窗口,因为您的下一个 DoFn 函数将已经对累积的窗格进行平均。
至于代码,我对早期和晚期触发逻辑进行了更改。我还建议增加窗口大小。由于您知道数据每 15 分钟出现一次,因此您应该在 15 分钟后关闭窗口,而不是在 15 分钟后关闭。但是您也不想选择最终会与 15 的倍数(如 20)发生冲突的窗口,因为在 60 分钟时您将遇到同样的问题。所以选择一个与 15 互质的数字,例如 19。还允许延迟输入。
PCollection<TrafficData> trafficData = input
.apply("MapIntoFixedWindows", Window.<TrafficData>into(
FixedWindows.of(Duration.standardMinutes(19))
.triggering(AfterWatermark.pastEndOfWindow()
// fire the moment you see an element
.withEarlyFirings(AfterPane.elementCountAtLeast(1))
//this line is optional since you already have a past end of window and a early firing. But just in case
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.standardMinutes(60))
.accumulatingFiredPanes());
如果这能解决您的问题,请告诉我!
编辑
所以,我无法理解您是如何计算上述示例的,所以我使用的是通用示例。下面是一个通用的平均函数:
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double>
public static class Accum
int sum = 0;
int count = 0;
@Override
public Accum createAccumulator() return new Accum();
@Override
public Accum addInput(Accum accum, Integer input)
accum.sum += input;
accum.count++;
return accum;
@Override
public Accum mergeAccumulators(Iterable<Accum> accums)
Accum merged = createAccumulator();
for (Accum accum : accums)
merged.sum += accum.sum;
merged.count += accum.count;
return merged;
@Override
public Double extractOutput(Accum accum)
return ((double) accum.sum) / accum.count;
为了运行它,您需要添加以下行:
PCollection<Double> average = trafficData.apply(Combine.globally(new AverageFn()));
由于您目前使用累积触发触发器,这将是解决解决方案的最简单的编码方式。
但是,如果您想使用丢弃火窗格窗口,则需要使用 PCollectionView
来存储上一个平均值并将其作为侧面输入传递给下一个平均值,以便跟踪值。这在编码中稍微复杂一些,但肯定会提高性能,因为每个窗口都会进行持续的工作,这与累积触发不同。
这对您生成自己的函数来丢弃火窗格窗口是否足够有意义?
【讨论】:
嗨@Haris,感谢您的解释。如果我使用大小为 19 的固定窗口,每个窗口将只有 1 个元素,不是吗?我不明白这将如何为我的数据提供“最后一小时的移动平均线”。 Tbh 我想知道我是否应该使用discardingFiredPanes
而不是为那个逻辑积累......
使用 discardingFiredPanes
的计算成本会更低,而且一个好的长期决策位需要您重新构建移动平均线的逻辑。
您能给我举个例子来说明您的用例中的移动平均线吗?或者我可以给你一个通用函数作为实现移动平均线的例子。
如果您使用 19 分钟,平均而言,您的列表中将有 1 个元素,但您的窗口中最多将有 2 个元素。 1/5 的时间您将拥有两个元素,而 4/5 的时间您将在窗口中拥有一个元素。如果你愿意,我可以解释这个逻辑。
我计算移动平均线的一个例子:link。基本上我的平均值将基于最后 4 或 5 个值的总和(除以 4 或 5)。我说“4 或 5”是因为有时我在 1 小时内得到 4 个不同的值,有时我得到 5。所以我不能将窗口设置为在 X 个元素后触发,而是我需要根据元素时间来做。
以上是关于SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)的主要内容,如果未能解决你的问题,请参考以下文章
PHP curl 不适用于 Windows 和 Apache