Flink Watermark分配策略
Posted 爱喝cola的
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Watermark分配策略相关的知识,希望对你有一定的参考价值。
Flink Watermark分配策略
WaterMark是Flink为了处理Event Time窗口计算提出的一种机制,本质上是一种时间戳。主要用来处理乱序数据或者延迟数据的,这里通常watermark机制结合window来实现(watermark用来触发window的计算)
说到Event Time,就要提到Flink的三种时间:
-
事件时间(Event Time): 事件现实发生的事件,是事件数据本身的一个属性(类似订单的创建时间)。使用这个语义时需要指定数据哪个字段表示时间,必须设置Watermark。使用EventTime时,数据可能是乱序的。
-
处理时间(Processing Time): 数据流进入到具体某个算子时的系统时间。不需要Watermark机制,只依赖当前节点的操作系统时间
-
提取时间(Ingestion Time): 数据进入Flink系统的时间,也就是flink读取数据源的时间(source)。从source到下游各个算子可能有多个计算环节,任何一个算子处理速度的快慢都可能影响下游算子的Processing Time。而Ingestion Time定义的是数据流入flink系统的时间,不会被下游的处理速度影响,因此ingestion Time是Event Time和Processing Time的一个折中方案,同样不需要设置Watermark,也不需要太多缓存,延迟较低。
旧版本的Watermark分配策略
- AssignerWithPeriodicWatermarks: 周期性生成watermark(可以取决于event time,或基于处理时间)
- AssignerWithPunctuatedWatermarks: 为每一个元素生成watermark(每来一个元素都进行一次判断,更消耗性能)
通常情况下会使用第一种机制,除更节省性能外,也可以避免已到元素长时间等待
当watermark完全基于event time时,如果没有元素到达,则watermark不会被更新。当一段时间没有元素到达,那么窗口中已经到达的元素将会等待很久才会被输出
为了避免这种情况,可以使用周期性的watermark分配器,这些分配器不仅仅基于event time进行分配,比如,可以使用一个分配器,当一段时间没有接收到新的event时,则将当前时间作为watermark。
以常用的周期性策略举例
BoundedOutOfOrdernessTimestampExtractor类 是AssignerWithPeriodicWatermarks接口的一个实现类,实现extractTimestamp方法,指定元素的哪个属性作为时间戳,并返回。
socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>()
@Override
public long extractTimestamp(String element)
return 0; // 一般这里返回的是事件的一个时间属性
)
新版本的Watermark分配策略
flink1.11版本后建议使用的watermark生成策略,当创建DataStream对象后,使用如下方法指定
assignTimestampsAndWatermarks(WatermarkStrategy[T])
WatermarkStrategy接口重写了两个方法createWatermarkGenerator,createTimestampAssigner,返回下边两个对象:
-
TimestampAssigner:主要指定流中数据元素的某个字段作为事件的时间戳,时间戳的分配是生成watermark的基础
-
WatermarkGenerator:主要负责按照既定方式,基于时间戳生成水位线,在WatermarkGzaienerator接口中有onEvent,onPeriodicEmit方法
源码
WatermarkStrategy的四种策略:
- 固定乱序长度策略(forBoundedOutOfOrderness)
- 单调递增策略(forMonotonousTimestamps)
- 自定义策略(forGenerator):一般内置的分配策略已经能够满足需求,也可以自定义类实现WatermarkStrategy接口
- 不生成策略(noWatermarks)
WatermarkStrategy的分配策略只需要实现WatermarkGenerator接口即可。该接口中有两个方法:
-
onEvent方法在接收到每一个事件数据时就会触发调用,该方法第一个参数event表示接收的事件数据,第二个参数eventTimeStamp表示事件时间戳,第三个参数output可以output.emitWatermark方法生成一个watermark。
-
onPeriodicEmit方法会周期性触发,比每个元素生成一个watermark效率高,接收一个watermarkOutput类型的参数output,内部可以用output.emitWatermark方法生成一个watermark。周期时间为处理时间,可以调用环境配置的env.setAutoWatermarkInterval() 方法来设置,默认为200ms
WatermarkGenerator
那么,就可以讲解下Flink内置的两种分配策略!!
forBoundedOutOfOrderness
需要注意的是,每来1条事件数据,只是更新了事件流的最大时间戳,并不会直接发送水印。
只有 @link ExecutionConfig#getAutoWatermarkInterval() 周期性间隔到了以后,水印才会被发送。
forMonotonousTimestamps
-
forMonotonousTimestamps使用AscendingTimestampsWatermarks类来实现,AscendingTimestampsWatermarks继承自BoundedOutOfOrdernessWatermarks类
-
相当于没有设置乱序延迟时间的forBoundedOutOfOrderness
案例
// 测试
public class WatermarkTest
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//周期性生成watermarkTest
env.getConfig().setAutoWatermarkInterval(100);
// 读取数据源,并行度为 1
DataStream<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Alice", "./prod?id=200", 3500L),
new Event("Bob", "/prod?id=2", 2500L),
new Event("Alice", "./prod?id=300", 3600L),
new Event("Bob", "./home", 3000L),
new Event("Bob", "./prod?id=1", 2300L),
new Event("Bob", "./prod?id=3", 3300L))
//有序的Watermarks
// .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
// .withTimestampAssigner(new SerializableTimestampAssigner<Event>()
// @Override
// public long extractTimestamp(Event element, long recordTimestamp)
// return element.timestamp; //默认是毫秒
//
// ))
//乱序的Watermarks
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>()
@Override
public long extractTimestamp(Event element, long recordTimestamp)
return element.timestamp;
)
);
env.execute();
WatermarkStrategyWithIdleness
WatermarkStrategyWithIdleness是空闲流的实现类,同样也实现了WatermarkGenerator接口,如果某时间段内没有记录在分区内流动,则该分区被标记为空闲,不会阻碍下游操作程序中水印的进度
如果某些分区数据很少,并且可能没有数据,那么空闲流的设置就显得很重要,如果没有空闲,这些流可以暂停整个事件时间的申请进度
//写一个小示例
public class TumblingEventWindowDemo
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple3<String,String, Long>> s = env.fromElements(
Tuple3.of("1","", 1L),
Tuple3.of("1","", 1L),
Tuple3.of("1","", 2L),
Tuple3.of("1","", 2L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>()
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp)
return element.f1;
)
.withIdleness(Duration.ofSeconds(50)) //表示50秒内没有元素到达,将该流标记为空闲流
);
env.execute();
源码解析可以去Flink官网查看。。
以上是关于Flink Watermark分配策略的主要内容,如果未能解决你的问题,请参考以下文章
Flink 源码解读系列 DataStream 带 Watermark 生成的时间戳分配器
Flink 窗口延迟数据处理 AllowedLateness