Flink事件时间和水印详解

Posted 长臂人猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink事件时间和水印详解相关的知识,希望对你有一定的参考价值。

前言

Flink使用版本:1.12.1。

  水印是一个标记的时间戳,是一个标记:意味着水印代表时间前的数据均已到达(人为的设定——开发人员可以控制延迟和完整性之间的权衡),这一点水印保障了乱序问题的解决(这很重要,特别是多分区kafka消费)。因为在流处理中,面对乱序问题,你不可能一直等待数据的到达而不去对数据进行操作(尤其像是聚合操作这类操作)。故此面对超时到达的数据你必须进行处理,如何判断超时数据——水印,你也可以设置一定的延迟时间。这两点(解决乱序;允许延迟)也是水印的主要功能。第二点通常与窗口一起使用:水印能拒绝过期数据,但是不能将流式数据进行“短暂”的“批处理”,所以用到窗口。

时间分类

Flink 中的时间分为三种:

  事件时间(Event Time)指的是数据产生的时间,这个时间一般由数据生产方自身携带,比如 Kafka 消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间。

  •   事件时间(Event Time),即事件实际发生的时间,生产时间;
  •   摄入时间(Ingestion Time),事件进入Flink流处理框架的时间;
  •   处理时间(Processing Time),事件被处理的时间。

在Flink 1.12版本前可以设置Flink主程时间特征:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//设置Flink系统使用时间为事件时间EventTime , 即时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

但是在在1.12版本后,该方法setStreamTimeCharacteristic被弃用!(FLIP-134: Batch execution for the DataStream API)时间特征默认为EvnetTime,若使用的是IngestionTime需要在手动实现水印策略WatermarkStrategy 接口,具体见下文的水印的产生


水印的意义

  官网有言:如果你想使用事件时间(EventTime),你还需要提供一个时间戳提取器(Timestamp Extractor)和水印生成器(Watermark Generator)。

水印所做的——它们定义了何时停止等待较早的事件:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记(窗口结束条件之一),是开发人员可以控制延迟和完整性之间的权衡。与批处理不同,在批处理中,人们可以在产生任何结果之前拥有完整的输入知识,而在流式处理中,则最终必须停止等待查看更多输入,并产生某种结果。流式处理中聚合事件(如count,sum)与批处理不同,需要在一定的范围内运算,而流是无界的,那么我们就需要搭配窗口来限定这个范围。窗口是这个范围限定,而水印则是范围的延迟。水印能拒绝过期数据,但是不能将流式数据进行“短暂”的“批处理”,所以用到窗口。

  程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。

官网流分析


设计模拟数据源

我们模拟kafka数据自带eventTime时间戳(最后一个),生产数据格式:(oneToic,6) + eventTime,逐渐递增。

package com.cbry.windows;

import java.util.Random;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class WindowImageSource implements SourceFunction<Tuple2<String, Long>>{

	private static final long serialVersionUID = 1L;
	private boolean is_Running = true;
	
	@Override
	public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
		// TODO Auto-generated method stub
		
		 Random random = new Random();
		 
		 int i = 1;
		 
		while(is_Running) {
			
			Tuple2<String, Long> element = new Tuple2<String, Long>("oneToic",(long)i);
			//ctx.collect(element);
			//生成水印
			if(i % 6 ==0) {
				ctx.collectWithTimestamp(element, (long)i*1000+2617160286000L);
			}else {
				ctx.collectWithTimestamp(element, (long)i*1000+1617160286000L);
			}
			i++;
			//每1秒一个数据
			Thread.sleep(1000);
		}
	}

	@Override
	public void cancel() {
		// TODO Auto-generated method stub
		is_Running=false;
	}

}


水印的产生

水印的使用


WatermarkStrategy的静态实现策略

在老版本中Watermark 的生成方式有两种:

  • AssignerWithPeriodicWatermarks 生成周期水印,周期默认的时间是 200ms;
  • AssignerWithPunctuatedWatermarks 按需生成水印。

为了避免代码的重复,在flink 1.11 中对flink的水印生成接口进行了重构:出现了新的方法:

dataStream.assignTimestampsAndWatermarks( WatermarkStrategy  watermarkStrategy  )

assignTimestampsAndWatermarks里面的参数可以是 WatermarkStrategy接口里面开箱即用的静态方法:内置水印生成器

  • forMonotonousTimestamps:为时间戳单调递增的情况创建水印策略。这个也就是相当于以event(流元素)中的时间戳充当了水印。

  • forBoundedOutOfOrderness(Duration maxOutOfOrderness):固定延迟生成水印:好比下面的自定义的水印生成器,将元素的时间戳,“提前”固定延迟:也就是入参maxOutOfOrderness。比如说event元素的时间戳为11.30分,入参15min,生成水印时间为11.15,我们实际上就允许了11.30这个延迟了15min的数据进入窗口(可能是新窗口,但是被水印拦截掉的不会进入窗口)。

    关于窗口和水印的结合使用如果有疑问,请看完全文结合用例结果再回过头来看思考一下。


WatermarkStrategy接口

除去静态实现的方法(本质上也官方实现该接口的类),我们关注一下本身这个接口的核心方法,我们可以自己自定义实现该接口实现自己个性化的水印策略

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    /**
     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
     * strategy.
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

使用WatermarkStrategy这种方式需要一个流并产生一个带有时间戳元素和水印的新流。如果原始流已经有时间戳和/或水印,时间戳分配器会覆盖它们。对于时间戳和水印该接口需要实现createTimestampAssigner和createWatermarkGenerator方法。


createTimestampAssigner

实例化一个TimestampAssigner用于根据此水印战略分配时间戳。

TimestampAssigner是一个从事件中提取字段的简单函数。


createWatermarkGenerator

编写一个水印生成器。

对于该方法需要实现的WatermarkGenerator接口存在两个方法:

  • onEvent:对流数据处理函数;

  • onPeriodicEmit:定期执行函数;


官网有给出自定义实现水印生成器

1、定期水印生成器:

​ onEvent方法什么也不写,赋予周期方法onPeriodicEmit逻辑。

public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // 什么也不做,我们专注于周期生产水印
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}


2、特殊事件水印生成器:

onPeriodicEmit周期方法什么也不做,赋予onEvent方法对每个事件(流数据)逻辑。
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 什么也不做,我们专注于每个事件(流数据)的逻辑
    }
}


自定义实现水印生产者

package com.cbry.windows;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.java.tuple.Tuple2;
/**
 * 实现一个简单的周期性的发射水印的例子
 * 在这个onEvent方法里,我们从每个元素里抽取了一个时间字段,但是我们并没有生成水印发射给下游,而是自己保存了在一个变量里,在onPeriodicEmit方法里,使用最大的日志时间减去我们想要的延迟时间作为水印发射给下游。
 * **/
public class MyWaterMarks implements WatermarkGenerator<Tuple2<String,Long>>{
	
	private long maxTimestamp;
	//设置允许乱序时间
	private long delay = 5000;

	/*
	 * 为每个事件调用,允许水印生成器检查并记住事件时间戳,或基于事件本身发出水印。
	 * */
	@Override
	public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
		// TODO Auto-generated method stub
		//记录最新的数据时间的值
		//maxTimestamp = Math.max(maxTimestamp, event.f1);
		maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
		System.err.println("maxTimestamp: " + maxTimestamp + " eventTimestamp: " + eventTimestamp);
	}

	/*
	 * 定期调用,并且可能会发出新的水印。
	 * */
	@Override
	public void onPeriodicEmit(WatermarkOutput output) {
		// TODO Auto-generated method stub
		//返回实际可接纳的时间,保障已有的数据时间 《= 水印
		output.emitWatermark(new Watermark(maxTimestamp - delay));
		//System.err.println("水印:" +  new Watermark(maxTimestamp - delay).toString());
	}

}

onEvent函数里面的 说明:即数据携带的时间戳信息;我们用弃用方法:设置成进入程序时间(IngestionTime)。打印的是当前时间,而非数据源产生携带时间。如果不设置(Flink 1.12默认为EventTime)即为正确时间。

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
maxTimestamp = Math.max(maxTimestamp, event.f1);	//便于展示:maxTimestamp打印的是event数据内容


自定义水印搭配滚动时间窗口效果

上面的模拟Source,为了便于描述我们从(onTopic,1)作为第一条数据开始生产。

实现代码

public class WaterMark {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		
	     //设置水印生成时间间隔100ms
        env.getConfig().setAutoWatermarkInterval(100);
		
		//设置Flink系统使用时间为事件时间EventTime , 即时间特征
		//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
		
		//设置一个延迟6秒的固定水印
		DataStream<Tuple2<String,Long>> source = env.addSource(new WindowImageSource());
		source.print("in ");
/*				env.socketTextStream("localhost", 9999);*/
		
		SingleOutputStreamOperator<Tuple2<String,Long>> dataStream = source.map(data -> new Tuple2<String,Long>(
				data.f0,data.f1
               // data.split(",")[0],  Long.parseLong(data.split(",")[1].trim())
        )).returns(Types.TUPLE(Types.STRING, Types.LONG));	//使用Lambda表达式返回必须指定返回类型

		
		dataStream.assignTimestampsAndWatermarks(
				//WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(6))
				//WatermarkStrategy.forMonotonousTimestamps()
				//WatermarkStrategy.forGenerator(new MyWaterMarks);
				new WatermarkStrategy<Tuple2<String,Long>>(){

					private static final long serialVersionUID = 1L;

					@Override
					public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(
							org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context context) {
						// TODO Auto-generated method stub
						return new MyWaterMarks();
					}
					
				}
				
				).keyBy((event) -> event.f0)
			//.window(TumblingEventTimeWindows.of(Time.seconds(5)))
			.window(TumblingEventTimeWindows.of(Time.seconds(6)))
			//.max(1)
			.process(new ProcessWindowFunction<Tuple2<String,Long>, String, String, TimeWindow>() {

				private static final long serialVersionUID = 1L;

			@Override
			  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
			    long count = 0;
			    //集合
			    ArrayList<Long> conllect = new ArrayList<Long>();
			    for (Tuple2<String, Long> in: input) {
			      conllect.add(in.f1);
			      count++;
			    }
			    out.collect("Window: " + context.window() + "count: " + count + " 数据:" + input.toString());
			  }
			})
			.printToErr("out ")
			;
		
		env.execute("Cbry WaterMark Test");
		
		
	}
	
}


水印和窗口初始参数

水印生产器:允许慢5s的数据加入 , 水印比事件时间EventTime提前5s;
滚动窗口6s ;

输出结果

思考第一个窗口的开始时间的区间怎么确定的?答在第一个窗口。

in > (oneToic,1)
maxTimestamp: 1617160287000 eventTimestamp: 1617160287000
in > (oneToic,2)
maxTimestamp: 1617160288000 eventTimestamp: 1617160288000
in > (oneToic,3)
maxTimestamp: 1617160289000 eventTimestamp: 1617160289000
in > (oneToic,4)
maxTimestamp: 1617160290000 eventTimestamp: 1617160290000
in > (oneToic,5)
maxTimestamp: 1617160291000 eventTimestamp: 1617160291000
in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000

水印:Watermark @ 2617160287000 (2052-12-07 12:58:07.000)

out > Window: TimeWindow{start=1617160284000, end=1617160290000}count: 3 数据:[(oneToic,1), (oneToic,2), (oneToic,3)]
out > Window: TimeWindow{start=1617160290000, end=1617160296000}count: 2 数据:[(oneToic,4), (oneToic,5)]
in > (oneToic,7)
maxTimestamp: 2617160292000 eventTimestamp: 1617160293000
in > (oneToic,8)
maxTimestamp: 2617160292000 eventTimestamp: 1617160294000

… …

水印:Watermark @ 2617160287000 (2052-12-07 12:58:07.000)
in > (oneToic,12)
maxTimestamp: 2617160298000 eventTimestamp: 2617160298000
水印:Watermark @ 2617160293000 (2052-12-07 12:58:13.000)

… …

水印:Watermark @ 2617160293000 (2052-12-07 12:58:13.000)
in > (oneToic,18)
maxTimestamp: 2617160304000 eventTimestamp: 2617160304000
水印:Watermark @ 2617160299000 (2052-12-07 12:58:19.000)
out > Window: TimeWindow{start=2617160292000, end=2617160298000}count: 1 数据:[(oneToic,6)]

具体加上水印输出效果图(0.1s有点小,可以适当调大):

分析前,先了解配合使用的eventTimeWindows窗口:

EventTimeWindow

  疑惑:数据的事件时间(EventTime)大于窗口,会否终止?

  答曰:窗口是左闭右开的,形式为:[window_start_time,window_end_time),故EventTime大于等于窗口建立新窗口。

EventTimeWindow结果是按照Event Time的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。

首先根据event流数据的时间戳EventTime创建第一个窗口,窗口Window会在以下的条件满足时被触发执行:

  • watermark时间 >= window_end_time;
  • 在[window_start_time,window_end_time)中有数据存在(入窗);

官网有对应的描述:Interaction of watermarks and windows :水印与窗口的交互列出了这两点,但是晦涩难懂,意思就是上面的两点意思。


分析

第一个窗口

第一条数据:1617160287000
第一个窗口:TimeWindow{start=1617160284000, end=1617160290000}

我们在生产数据的时候以6结尾的数据:(long)i*1000+2617160286000L

in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000

关于窗口初始化的范围:由TumblingEventTimeWindows源码知:由窗口大小size,和

 protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger)
 public static TumblingEventTimeWindows of(Time size)  //多态可设置偏移量:Time offset ,单size构造offset为0
 //后面一直追到:WindowStagger.getStaggerOffset —> TimeWindow.getWindowStartWithOffset:

最终层层取余运算只与:窗口大小size + 偏移量offset(这里为0,无关) + 第一个数据时间戳timestamp + 当前程序处理时间currentProcessingTime 有关 。

源码有解释道: 窗口的开始和当前处理时间作为偏移量,这样,窗户是交错的。


第二个窗口

第二个窗口:TimeWindow{start=1617160290000, end=1617160296000}
这个时候二窗口从第四个数据(oneToic,4)开始:

in > (oneToic,4)
maxTimestamp: 1617160290000 eventTimestamp: 1617160290000
in > (oneToic,5)
maxTimestamp: 1617160291000 eventTimestamp: 1617160291000
in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000
in > (oneToic,7)
maxTimestamp: 2617160292000 eventTimestamp: 1617160293000
in > (oneToic,8)
maxTimestamp: 2617160292000 eventTimestamp: 1617160294000

第6个数据(oneToic,6)是 2617160292000源自:

原数据产生:

if(i % 6 ==0) {
ctx.collectWithTimestamp(element, (long)i*1000+2617160286000L);
}

这时候水印超出了窗口时间:

in > (oneToic,6)
maxTimestamp: 2617160292000 eventTimestamp: 2617160292000
水印:Watermark @ 2617160287000 (2052-12-07 12:58:07.000)

  数据不被窗口(1617160290000-1617160296000 )采纳,生成新的窗口三(2617160292000-2617160298000),同时水印更新最大值为2617160287000,水印意味着——水印前的数据均已到达,再有舍弃。所以后面的7/8/9均被舍弃。

这里水印为2617160287000的原因:

	//自定义水印MyWaterMarks中
	//	topic6
	//  2617160292000 - 5000 = 2617160287000
	output.emitWatermark(new Watermark(maxTimestamp - delay));
	System.err.println("水印:" +  new Watermark(maxTimestamp - delay).toString());

故此二窗口只有:

count: 2 数据:[(oneToic,4), (oneToic,5)]


第三个窗口

【!important】:且第三个窗口根据新的值(topic6的EventTime)来,且到达topic18的时候水印:2617160299000>=窗口的结束时间:2617160298000触发窗口计算—>生成新的窗口。

水印:Watermark @ 2617160293000 (2052-12-07 12:58:13.000)
in > (oneToic,18)
maxTimestamp: 2617160304000 eventTimestamp: 2617160304000
水印:Watermark @ 2617160299000 (2052-12-07 12:58:19.000)
out > Window: TimeWindow{start=2617160292000, end=2617160298000}count: 1 数据:[(oneToic,6)]

那为什么12没有进去三窗口呢?因为12:eventTimestamp: 2617160298000,刚好在窗口的边界上,窗口左开右闭(>=start 且 <end)。


后面每个窗口存放第6的倍数个数据:

水印:Watermark @ 2617160299000 (2052-12-07 12:58:19.000)
in > (oneToic,24)
maxTimestamp: 2617160310000 eventTimestamp: 2617160310000
水印:Watermark @ 2617160305000 (2052-12-07 12:58:25.000)
out > Window: TimeWindow{start=2617160298000, end=2617160304000}count: 1 数据:[(oneToic,12)]


关于迟到数据处理

有两种方法:

  1. 窗口允许延迟
  2. 侧流输出

窗口允许延迟

但是有些业务场景需要我们等待一段时间,也就是接受一定范围的迟到数据,此时 allowedLateness 的设置就显得尤为重要。简单地说,allowedLateness 的设置就是对于那些水印通过窗口的结束时间后,还允许等待一段时间。

stream.
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.seconds(10))
    .process(...);


侧流输出

我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流。

关于侧流输出请看:Flink 三种数据流分流(推荐旁路分流)

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
    .keyBy(...)
    .window(...)
    .sideOutputLateData(lateTag)
    .process(...);
  
DataStream<Event

以上是关于Flink事件时间和水印详解的主要内容,如果未能解决你的问题,请参考以下文章

Flink 1.8 Generating Timestamps, Watermarks 生成时间戳, 水印

Flink 操作示例 —— 水印

Flink流处理之窗口算子分析

Apache Flink:使用事件时间方式处理工业数据和延迟数据

Flink中window 窗口和时间以及watermark水印

Flink / Scala - TimeWindow 处理迟到数据详解