flink 控制窗口行为(触发器移除器允许延迟将迟到的数据放入侧输出流)
Posted 但行益事莫问前程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink 控制窗口行为(触发器移除器允许延迟将迟到的数据放入侧输出流)相关的知识,希望对你有一定的参考价值。
文章目录
1. 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算,执行窗口函数。基于 WindowedStream 用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
------onElement():窗口中每到来一个元素,都会调用这个方法
------onEventTime():当注册的事件时间定时器触发时,将调用这个方法
------onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法
------clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态
这几个方法的参数中都有一个触发器上下文(TriggerContext)对象,可以用来注册定时器回调(callback)。定时器(Timer)代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作
除clear外,三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型:
------CONTINUE(继续):什么都不做
------FIRE(触发):触发计算,输出结果
------PURGE(清除):清空窗口中的所有数据,销毁窗口
------FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。如:所有事件时间窗口,默认的触发器都是 EventTimeTrigger
;
源码:
public class EventTimeTrigger extends Trigger<Object, TimeWindow>
private static final long serialVersionUID = 1L;
private EventTimeTrigger()
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception
if (window.maxTimestamp() <= ctx.getCurrentWatermark())
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
else
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception
return TriggerResult.CONTINUE;
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception
ctx.deleteEventTimeTimer(window.maxTimestamp());
@Override
public boolean canMerge()
return true;
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx)
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark())
ctx.registerEventTimeTimer(windowMaxTimestamp);
@Override
public String toString()
return "EventTimeTrigger()";
/**
* Creates an event-time trigger that fires once the watermark passes the end of the window.
*
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
*/
public static EventTimeTrigger create()
return new EventTimeTrigger();
实际场景:
每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 5 秒钟触发一次窗口的计算:
public class TriggerExample
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> streamSource = env.addSource(new ClickSource());
SingleOutputStreamOperator<Event> watermarks = streamSource.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>()
@Override
public long extractTimestamp(Event event, long l)
return event.timestamp;
)
);
SingleOutputStreamOperator<UrlViewCount> process = watermarks.keyBy(r -> r.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.trigger(new MyTrigger())
.process(new WindowResult());
process.print();
env.execute();
public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>
@Override
public void process(String s, Context context, Iterable<Event> iterable, Collector<UrlViewCount> collector) throws Exception
collector.collect(new UrlViewCount(
s,
// 获取迭代器中的元素个数
iterable.spliterator().getExactSizeIfKnown(),
context.window().getStart(),
context.window().getEnd()
)
);
public static class MyTrigger extends Trigger<Event, TimeWindow>
@Override
public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
if (isFirstEvent.value() == null)
for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 5000L)
triggerContext.registerEventTimeTimer(i);
isFirstEvent.update(true);
return TriggerResult.CONTINUE;
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
return TriggerResult.FIRE;
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
return TriggerResult.CONTINUE;
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
isFirstEvent.clear();
结果如图所示,每个用户在10秒的窗口内,触发了3次计算
2. 移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())
Evictor 接口定义了两个方法:evictBefore():定义执行窗口函数之前的移除数据操作; evictAfter():定义执行窗口函数之后的以处数据操作默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
3. 允许延迟(Allowed Lateness)
当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个允许的最大延迟
(Allowed Lateness)。在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算
。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))
4. 将迟到的数据放入侧输出流
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现将未收入窗口的迟到数据,放入侧输出流(side output)进行另外的处理。方法需要传入一个输出标签
(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同
DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") ;
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,基于窗口处理完成之后的DataStream,用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
以上是关于flink 控制窗口行为(触发器移除器允许延迟将迟到的数据放入侧输出流)的主要内容,如果未能解决你的问题,请参考以下文章