apache beam入门之 窗口水位线和超时数据概念
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了apache beam入门之 窗口水位线和超时数据概念相关的知识,希望对你有一定的参考价值。
目录:apache beam 个人使用经验总结目录和入门指导(Java) 上一章讲了窗口概念,这一章要接着窗口中很重要的窗口水位线和超时数据概念。
beam如何确定当前时间?
例如有1个 00:00:00-00:00:03的窗口,beam 默认是当时间过了00:00:03之后,该窗口会关闭,并将该窗口内的数据往下发送。
但是有个问题,这个时间对beam来说如何确定? 是系统时间过了00:00:03就关闭窗口吗?
答案是否, 因为如果依据系统时间来确定当前时间并用来结束窗口, 那不确定性很高,因为记录到达的过程可能存在乱序、延迟等情况,系统之间也存在传输时间差距问题。
正常情况下我们是通过提取kafka记录里的某条信息,作为记录产生时间(例如读取日志里的时间),然后进行手动时间戳设定。kafkaIO手动设定读取来的时间戳见[apache beam 入门之kafkaIO的使用]
因此beam的“当前时间”,是依据 已输入时间戳的最大值来设定的。
当beam的记录中发现时间戳为00:00:04秒时,则认定此时已经过了00:00:00-00:00:03, 则此窗口关闭。
以固定时间窗口为例,进行当前时间问题验证
基于kafaIO read的例子,我们进行测试来验证上面的说法:
我们建立1个kafkaIO, 并将kafka记录的时间戳设为 (2020年0点0分 + value毫秒值)
并将kafka记录的值进行打印,如下:
.Read<String, String> kafkaRead =
KafkaIO.<String, String>read()
.withBootstrapServers("189.211.150.241:21005,189.211.150.246:21005,189.211.151.18:21005,189.211.151.36:21005")
.withTopic("beamKafkaTest")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// 设定输入时间戳
.withTimestampFn((kv)->(new Instant(NOW_TIME + Integer.valueOf(kv.getValue()))));
PCollection<String> pTimeStr = pipeline.apply(kafkaRead.withoutMetadata())
// 转成value值
.apply(Values.create())
// 打印时间戳和value值,并原样返回
.apply(ParDo.of(new PrintTimeStampStrFn()));
我们设定1个3s的固定时间窗口
<String> pTimeStrByWindow = pTimeStr
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(3))));
接着把pTimeStrByWindow里各窗口的值合并成列表并输出
// 同1个窗口内的合成1个列表,最多聚合100个
pTimeStrByWindow.apply(Combine.globally(Sample.<String>anyCombineFn(100)).withoutDefaults())
// 输出
.apply(ParDo.of(new PrintStrFn()));
注意下文都只基于pTimeStr, 进行窗口的不同设定测试, 输入和输出不再做改动。
我用kafka客户端往kafkaBeamTest主题生产1条value为1000的消息, IDEA里打印的结果如下:
该处打印是收到kafka记录时的打印, 但并没有打印聚合窗口的结果。
我们接着生成2500和1500消息, 仍然没有对窗口做聚合。
我们再生成1个3000消息,可看到输出了[00:00:00-00:00:03)聚合窗口的结果:
说明beam认为此时已经过了2020-01-01 00:00:03秒了, 因此[00:00:00-00:00:03)窗口可以触发向下传递了。
我们再输入2000, 并不会再触发[00:00:00-00:00:03)窗口
因为已经过了窗口期了,此数据被认为是延迟数据,会被丢弃。
延迟水位线设定
上面这种处理方式存在1个问题, 如果因为分布式传输问题, 导致3000先于1000、1500、2500到达怎么办? 难道beam就要认为此时已经是00:00:03了,不管前面的数据了吗?
但如果一定要考虑迟到的数据,那我又要在什么时候才能认为窗口应该关闭呢?
因此beam里有1个延迟水位线的概念可以处理这种情况。
假设我们觉得自己的系统最多能接受5s的延迟,即收到了00:00:08记录的情况下,仍然认为后面还可能会有00:00:03-00:00;07的数据会到达且能够接受(00:00:02之前的记录太老了,就不要了),那么就可以给窗口设定5s的延迟时间,如下图所示:
使用withAllowedLateness进行设定,并需要设定是累加模式还是抛弃模式
- 累加方式accumulatingFiredPanes,指每收到延迟数据,就会把延迟数据和之前窗口里的数据合到一起进行再触发。
- 丢弃方式discardingFiredPanes,指每收到延迟数据,只会针对这一次的延迟数据做触发,不再管以前的数据。
代码例子如下:
<String> pTimeStrByWindow = pTimeStr
// 3s固定窗口
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(3)))
// 设定5s的延迟时间
.withAllowedLateness(Duration.standardSeconds(5))
// 累加方式,把之前该窗口内的数据合并到一起再计算一次
.accumulatingFiredPanes());
我们重新执行刚才的例子, 先一样生产1000,1500,2500,3000消息, 打印如下:
接着再输入2000消息,可看到再次把2000和之前的3个数据合并做了输出
(若设定的是丢弃方式discardingFiredPanes,则只会输出1个groupValue=[2000])我们再输入8000消息, 此时时间已经被我们设定成了00:00:08, 会再输出[00:00:03,00:00:07)
窗口
此时我们再输入2000消息,发现已经不会再输出该窗口了,因为已经过了水位线了。
延迟数据复杂情况
我们发现beam是在水位线中如果一收到延迟数据就马上触发。
但如果想要它再等个3s,把剩余的延迟数据都收进来再触发,怎么做?(即这个延迟数据后面可能跟着一批类似的延迟数据) 那就要用到触发器了,这个我们放到下一章apache 入门之触发器概念 来讲。
以上是关于apache beam入门之 窗口水位线和超时数据概念的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam FixedWindows 之间的延迟