1.22.FLINK WatermarkFlink窗口(Window)watermark有什么用?如何使用Watermarks处理乱序的数据流?机制及实例详解生成方式代码实例
Posted to.to
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.22.FLINK WatermarkFlink窗口(Window)watermark有什么用?如何使用Watermarks处理乱序的数据流?机制及实例详解生成方式代码实例相关的知识,希望对你有一定的参考价值。
1.22.揭开Watermark的神秘面纱
1.22.1.问题导读
1.22.2.流数据异常该怎么办:
1.22.2.1.Flink窗口(Window)
1.22.2.2.事件时间
1.22.3.watermark有什么用?
1.22.4.如何使用Watermarks处理乱序的数据流?
1.23.Watermark机制及实例详解
1.24.Watermark生成方式
1.25.Watermark代码实例
1.26.Watermark的案例
1.22.揭开Watermark的神秘面纱
以下转自:https://www.aboutyun.com/thread-26843-1-1.html
1.22.1.问题导读
1.什么是水位线?
2.水位线的作用是什么?
3.水位线的本质是什么?
4.水位线是为了解决什么问题?
刚接触Flink,可能你听说过“水印”或则“水位线”,但是技术领域为何会出现非技术词汇,到底什么是水位线,这两个陌生的词汇,给Flink蒙上了一层神秘的面纱。
这里我们就为大家揭开Watermark的神秘面纱。
回到Flink场景中来,Flink被称为终极流式框架,它是真正的流式处理,认为批处理是流处理的特殊情况,整因为这样,Flink统一了流处理和批处理。
我们这里的水位线,说的就是流处理事件中一个概念或则说流处理过程中遇到问题的解决方案。
在说水位线之前,为了照顾新手,我们首先需要明白什么是流处理?
所谓的流处理,最本质的是在处理数据的时候,是接受一条处理一条数据。而批处理,则是累积数据到一定程度在处理。这是他们本质的区别。
1.22.2.流数据异常该怎么办:
上面我们明白了什么是流处理,先不要着急去想什么Watermark,假如我们自己写一个流式框架。我们该如何处理消息。如下,我们看到消息按照顺序一个个发送,接受后按照顺序处理,这是没有什么问题的。
然而在来看下面情况:
消息不在是按照顺序发送,产生了乱序,这时候该怎么处理?
如果你看到这里,相信很多人可以猜出来了,水位线-Watermark是其中的解决方案之一。
看到这里其实可能还是懵逼状态,水位线怎么能解决这个乱序。
为了更好地明白下面讲的内容,补充下Flink的一些基本知识。
1.22.2.1.Flink窗口(Window)
对于Flink如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于Spark和Flink都产生了窗口计算。对于窗口感兴趣可参考Flink实时性、容错机制、窗口等介绍(https://www.aboutyun.com/forum.php?mod=viewthread&tid=25540)
1.22.2.2.事件时间
我们知道一个事件发生了,肯定是有时间的,这个时间,在Flink中被称之为事件时间,也就是Event Time。当然还有其它时间,如感兴趣可参考Flink系统学习11:【Flink1.7】事件时间、处理时间、提取时间有什么区别。
上面两个概念明白了,我们就要讲:
1.22.3.watermark有什么用?
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
1.22.4.如何使用Watermarks处理乱序的数据流?
什么是乱序呢?如上面图示,可以理解为数据到达的顺序和他的event-time(事件发生时间)排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等
因为Watermarks是用来触发window窗口计算的,我们可以根据事件的event-time,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。
这里我们需要稍微解释下:窗口的触发机制,比如滚动窗口,或则滑动窗口等,他们都是有自己的触发机制的,比如每隔5秒,窗口就会计算一次,也就是说,每隔5秒窗口就会触发一次。如对窗口计算不了解或则感兴趣,可参考Flink实时性、容错机制、窗口等介绍(http://www.aboutyun.com/forum.php?mod=viewthread&tid=25540)。这里不再详述。
我们明白了窗口的触发机制,这里我们添加了水位线,到底是个怎么个情况?我们来看下面
假如我们设置10s的时间窗口(window),那么010s,1020s都是一个窗口,以0~10s为例,0位start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9©,13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒,也就是说对于迟到的数据,我们只等你3.5秒。【这里你可能有个问题,如果超过3.5秒该怎么办,这时候就需要我们对生产环境有一个整体的认识和把握,数据是否有延迟,延迟大概是多长时间,这样达到数据不丢失。当然还有另外的方法来处理延迟,我们这里只讲水位线。】
当A到达的时候,Watermarks为max8-3.5=8-3.5 = 4.5 < 10,不会触发计算
当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算
触发计算的时候,会将AC(因为他们都小于10)都计算进去
通过上面这种方式,我们就将迟到的C计算进去了
这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算的,加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。
从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。
上面我们明白了水位线的机制,后面我们将进一步加深从代码搞懂水位线。
1.23.Watermark机制及实例详解
本文转自:https://my.oschina.net/u/2380815/blog/4444523
Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。简单来说,我们可以把他理解为水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。
在实际的生产中,由于业务系统背压或网络延迟导致事件的创建时间和处理时间不一致,导致流处理的结果跟实际结果有较大的差异。但是对于延迟数据,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。其作用定义一个最大乱序时间,举个例子,比如某条消息记录时间为2020-04-06 10:00:10,如果乱序最大允许时间为10s,那么就认为2020-04-06 10:00:00之前产生的所有消息记录都到齐了,可以进行计算。
1.24.Watermark生成方式
一般情况下,Flink在接收到Source数据后,应该立即生成Watermark,但是我们可以在经过了简单的operator后再生成Watermark,需要注意的是,如果多次生成了Watermark,后面的会覆盖前面的;生成watermark的方式主要有2大类:
1)Periodic :一定时间间隔或者达到一定的记录条数会产生一个watermark,通常这种用的比较多。
2)Punctuated : 基于event time通过一定的逻辑产生watermark,比如收到一个数据就产生一个WaterMark,时间是event time + 10秒。
以上两种产生方式,都有机制来保证产生的watermark是单调递增的。即使有了watermark,如果现实中,数据没有满足watermark所保证的条件怎么办?比如Flink处理了08:01的watermark,但是之后遇到了event time是07:00~08:00之间的数据怎么办?首先如果这种事情出现的概率非常小,不影响所要求的准确度,可以直接把数据丢弃;如果这种事情出现的概率比较大,就要调整产生water mark的机制了。
除了把违反watermark机制的数据丢弃,也有不丢弃的处理方法,比如通过一些机制来更新之前统计的结果,这种方式会有一定的性能开销。
1.25.Watermark代码实例
下面我们通过一个实例来演示下Periodic Watermark,我们从socket接收数据,然后讲过简单的operator之后,立刻抽取timetamp并生成Watermark,之后应用window来看看watermark和event time如何变化,才导致window被触发计算的。
先把主函数代码贴一下,仔细看代码中的注释:
package com.toto.demo.watermark;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import javax.annotation.Nullable;
/**
* @description 在实现滚动窗口 设置EventTime为时间处理标准,统计每个窗口单词出现次数 窗口时间为30秒,
* 消息的最大延迟时间是5秒。
**/
public class TumblingWindowWatermarkWordCount
public static void main(String[] args) throws Exception
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
/* 设置使用EventTime作为Flink的时间处理标准,不指定默认是ProcessTime */
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//这里为了便于理解,设置并行度为1,默认并行度是当前机器的cpu数量
senv.setParallelism(1);
/*指定数据源 从socket的9000端口接收数据,先进行了不合法数据的过滤*/
DataStream<String> sourceDS = senv.socketTextStream("localhost", 9000)
.filter(new FilterFunction<String>()
@Override
public boolean filter(String line) throws Exception
if (null == line || "".equals(line))
return false;
String[] lines = line.split(",");
if (lines.length != 2)
return false;
return true;
);
/*做了一个简单的map转换,将数据转换成Tuple2<long,String,Integer>格式,第一个字段代表是时间 第二个字段代表的是单词,第三个字段固定值出现了1次*/
DataStream<Tuple3<Long, String, Integer>> wordDS = sourceDS.map(new MapFunction<String, Tuple3<Long, String, Integer>>()
@Override
public Tuple3<Long, String, Integer> map(String line) throws Exception
String[] lines = line.split(",");
return new Tuple3<Long, String, Integer>(Long.valueOf(lines[0]), lines[1], 1);
);
/*设置Watermark的生成方式为Periodic Watermark,并实现他的两个函数getCurrentWatermark和extractTimestamp*/
DataStream<Tuple3<Long, String, Integer>> wordCount = wordDS.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<Long, String, Integer>>()
private Long currentMaxTimestamp = 0L;
/*最大允许的消息延迟是5秒*/
private final Long maxOutOfOrderness = 5000L;
@Nullable
@Override
public Watermark getCurrentWatermark()
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
@Override
public long extractTimestamp(Tuple3<Long, String, Integer> element, long previousElementTimestamp)
long timestamp = element.f0;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
/*这里根据第二个元素 单词进行统计 时间窗口是30秒 最大延时是5秒,统计每个窗口单词出现的次数*/
).keyBy(1)
/*时间窗口是30秒*/
.timeWindow(Time.seconds(30))
.sum(2);
wordCount.print("\\n单词统计:");
senv.execute("Window WordCount");
下面我们简单演示下计算步骤:
1.这里时间窗口是30秒,延迟时间是5秒,我这里先后输入一下数据,第一个字段代表是EventTime,第二个是出现的单词。
10000,a
20000,b
29999,c
30000,a
34998,a
这里时间窗口是30秒,Flink的时间窗口是左闭右开的[0,30000),如果这里我们设置消息延迟是0秒,输入29999,c 就应该触发,窗口计算, 可是这里我们设计的最大延迟是5秒,那什么时候触发第一次窗口计算呢?应该是29999+5000=34999,如下图所示,输入34998,e没有触发计算,跟我们的推测是一致的,如下图所示:
当输入以下(注意,如果Linux服务器上没有nc工具,直接使用yum install nc进行安装):
[root@flink01 ~]# nc -lp 9000
1000,a
2000,b
10000,a
20000,c
34998,e
的时候,没有触发窗口计算。
2.接下来我们继续输入,34999,d应该会第一次触发窗口计算进行数据的输出,至此我们应该输入了6行数据,那输出的计算结果是什么呢?这里应该是输出[0,30000)时间段之间的数据,如下图所示:
34999,d
会输出:
单词统计:> (1000,a,2)
单词统计:> (2000,b,1)
单词统计:> (20000,c,1)
3.接下来我们还要验证两件事,一是第二次窗口触发时间,按照这样计算,那下次触发计算的时间应该是30000+29999+5000=64999, 另一个是:第一次窗口触发计算完成后,又来了一条25000,a数据,该如何处理呢,由于触发了计算之后watermark应该更新成了30000,比他小的数据会被丢弃,我们验证下:
输入了25000,a和64998,c没有触发计算, 如果输入64999,c的时候,会触发执行,如图所示:
输出的结果是:
单词统计:> (34998,e,1)
单词统计:> (34999,d,1)
1.26.Watermark的案例
再如案例:https://blog.csdn.net/zhaoyuqiang/article/details/107453466
以上是关于1.22.FLINK WatermarkFlink窗口(Window)watermark有什么用?如何使用Watermarks处理乱序的数据流?机制及实例详解生成方式代码实例的主要内容,如果未能解决你的问题,请参考以下文章