从0到1Flink的成长之路(二十)-Time与Watermaker

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(二十)-Time与Watermaker相关的知识,希望对你有一定的参考价值。

Time与Watermaker

在这里插入图片描述
event_time

1 Time 时间分类
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
在这里插入图片描述
事件时间EventTime: 事件真真正正发生产生的时间;
摄入时间IngestionTime: 事件到达Flink的时间;
处理时间ProcessingTime: 事件真正被处理/计算的时间;

问题: 上面的三个时间,实际业务计算时更关注哪一个?
答案: 更关注事件时间EventTime !
因为: 事件时间更能反映事件的本质, 只要事件时间一产生就不会变化。

2 EventTime 重要性
2.1 示例一
假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。

当你找到自己的车并且开出地下停车场的时候,已经是12点01分了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
在上面这个场景中你可以看到:

支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分

问题:

如果要统计12点之前的订单金额,那么这笔交易是否应被统计?

答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59 分,事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。

2.2 示例二
一条错误日志的内容为:

2020-11:11 22:59:00 error NullPointExcep --事件时间

进入Flink的时间为2020-11:11 23:00:00 --摄入时间
到达Window的时间为2020-11:11 23:00:10 --处理时间
问题:

对于业务来说,要统计1h内的故障日志个数,哪个时间是最有意义的?

答案:
EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反
映/代表事件的本质!

2.3 示例三
某App会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。A用户在 11:01:00 对App进行操作,B用户在11:02:00操作App,但是A用户的网络不太稳定,回传日志延迟,导致在服务端先接受到B用户的消息,再接受到A用户的消息,消息乱序。

问题:

如果这个是一个根据用户操作先后顺,进行抢购的业务,那么是A用户成功还是B用户成功?

答案:
应该算A成功,因为A确实比B操作的早,但是实际中考虑到实现难度,能直接按B成功算也就是说,实际开发中希望基于事件时间来处理数据,但因为数据可能因为网络延迟等原因,出 现了乱序,按照事件时间处理起来有难度!

2.4 示例四
在实际环境中,经常会出现,因为网络原因,数据有可能会延迟一会才到达Flink实时处理系统。先来设想下面这个场景:
原本应该被该窗口计算的数据,因为网络延迟等原因晚到,就有可能丢失了
在这里插入图片描述
2.5 总结
实际开发中,希望基于事件时间来处理数据,但因为数据可能网络延迟等原因,出现了乱序或延迟到达,那么可能处理的结果不是想要的甚至出现数据丢失的情况,所以需要一种机制来解决一定程度上的数据乱序或延迟到底的问题。接下来学习Watermaker水印机制/水位线机制。
在这里插入图片描述
2.6 案例:滚动事件时间窗口
基于事件时间EventTime Tumbling Window窗口:5秒,进行聚合统计:WordCount。

package xx.xxxxxx.flink.eventtime;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* 窗口统计案例演示:滚动事件时间窗口(Tumbling EventTime Window),窗口内数据进行词频统计
*/
public class StreamTumblingEventTimeWindow {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO: step1. 设置基于事件时间窗口统计
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 数据源-source
SingleOutputStreamOperator<String> inputStream = env
.socketTextStream("node1.itcast.cn", 9999)
.filter(line -> null != line && line.trim().split(",").length == 3);
/*
数据格式:1000,a,3 2000,b,2 5000,a,9
*/
// TODO: step2. 指定事件时间EventTime字段
SingleOutputStreamOperator<String> timeStream = inputStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override
public long extractTimestamp(String line) {
String[] array = line.trim().split(",");
return Long.parseLong(array[0]);
} }
);
// 3. 数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = timeStream.map(
new MapFunction<String, Tuple2<String, Integer>>() { @Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] array = line.trim().split(",");
return Tuple2.of(array[1], Integer.parseInt(array[2]));
} }
);
// TODO: 事件时间窗口设置
SingleOutputStreamOperator<Tuple2<String, Integer>> sumStream = tupleStream
// 按照单词分组
.keyBy(0)
// TODO: step3. 设置事件时间窗口大小为5秒
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // .timeWindow(Time.seconds(5))
// 窗口内聚合操作
.sum(1);
// 4. 数据终端-sink
sumStream.printToErr();
/*
测试数据:
1000,a,1
2000,a,1
5000,a,1 --> 触发窗口计算,此条数据不包含
9999,a,1 --> 触发窗口计算,此条数据包含
11000,a,2
14000,b,1
14999,b,1 --> 触发窗口计算,此条数据包含
输出结果:
(a,2)
(a,2)
(a,2)
(b,2)
*/
// 5. 触发执行-execute
env.execute(StreamTumblingEventTimeWindow.class.getSimpleName());
} }

为了更好的看到窗口时间范围,进行window窗口操作后,使用apply函数,对窗口数据进行聚合操作,打印窗口范围,代码如下所示:

package xx.xxxxxx.flink.eventtime;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* 窗口统计案例演示:滚动事件时间窗口(Tumbling EventTime Window),窗口内数据进行词频统计
*/
public class StreamEventTimeWindowApply {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO: step1. 设置基于事件时间窗口统计
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 数据源-source
SingleOutputStreamOperator<String> inputStream = env
.socketTextStream("node1.itcast.cn", 9999)
.filter(line -> null != line && line.trim().split(",").length == 3);
/*
数据格式:1000,a,3 2000,b,2 5000,a,9
*/
// TODO: step2. 指定事件时间EventTime字段
SingleOutputStreamOperator<String> timeStream = inputStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override
public long extractTimestamp(String line) {
String[] array = line.trim().split(",");
return Long.parseLong(array[0]);
} }
);
// 3. 数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = timeStream.map(
new MapFunction<String, Tuple2<String, Integer>>() { @Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] array = line.trim().split(",");
return Tuple2.of(array[1], Integer.parseInt(array[2]));
} }
);
// TODO: 事件时间窗口设置
FastDateFormat dateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS");
SingleOutputStreamOperator<Tuple2<String, Integer>> sumStream = tupleStream
// 按照单词分组
.keyBy(0)
// TODO: step3. 设置事件时间窗口大小为5秒
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // .timeWindow(Time.seconds(5))
// 窗口内聚合操作: 使用apply函数
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { @Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> input,
Collector<Tuple2<String, Integer>> out) throws Exception {
// 获取窗口开始start和结束end
String windowStart = dateFormat.format(window.getStart());
String windowEnd = dateFormat.format(window.getEnd());
String word = tuple.toString() ;
// 对窗口内数据进行聚合操作:累加和
int sum = 0 ;
for(Tuple2<String, Integer> item: input){
sum += item.f1 ; }
// 输出内容
String output = "window[" + windowStart + " ~ " + windowEnd + "] -> " + word;
out.collect(Tuple2.of(output, sum));
}
});
// 4. 数据终端-sink
sumStream.printToErr();
/*
测试数据:
1000,a,1
2000,a,1
5000,a,1 --> 触发窗口计算,此条数据不包含
9999,a,1 --> 触发窗口计算,此条数据包含
11000,a,2
14000,b,1
14999,b,1 --> 触发窗口计算,此条数据包含
输出结果:
(window[1970-01-01 08:00:00:000 ~ 1970-01-01 08:00:05:000] -> (a),2)
(window[1970-01-01 08:00:05:000 ~ 1970-01-01 08:00:10:000] -> (a),2)
(window[1970-01-01 08:00:10:000 ~ 1970-01-01 08:00:15:000] -> (a),2)
(window[1970-01-01 08:00:10:000 ~ 1970-01-01 08:00:15:000] -> (b),2)
*/
// 5. 触发执行-execute
env.execute(StreamEventTimeWindowApply.class.getSimpleName());
} }

2.7 起始窗口计算
使用事件时间EventTime进行窗口计算时,首先依据第一条数据的事件时间计算第一个窗口时间范围(Time Range),再根据窗口大小与滑动大小进行计算第二个窗口,第三个窗口,以此类推下一个窗口时间范围。
查看org.apache.flink.streaming.api.windowing.windows.TimeWindow类,其中方法:
getWindowStartWithOffset计算第一个窗口时间范围:

在这里插入图片描述

以上是关于从0到1Flink的成长之路(二十)-Time与Watermaker的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(二十一)-异步IO

从0到1Flink的成长之路(二十)-Flink 高级特性之Checkpoint 配置方式

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路(二十)-扩展:并行度(Parallelism)

从0到1Flink的成长之路(二十一)-Sink

从0到1Flink的成长之路(二十)-Flink 高级特性之状态分类