Flink窗口转换算子
Posted 虎鲸不是鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink窗口转换算子相关的知识,希望对你有一定的参考价值。
窗口转换算子预览
之前扒源码看到过Flink的窗口有很多种:
package org.apache.flink.streaming.api.windowing.assigners;
/**
* A @code WindowAssigner assigns zero or more @link Window Windows to an element.
*
* <p>In a window operation, elements are grouped by their key (if available) and by the windows to
* which it was assigned. The set of elements with the same key and window is called a pane. When a
* @link Trigger decides that a certain pane should fire the @link
* org.apache.flink.streaming.api.functions.windowing.WindowFunction is applied to produce output
* elements for that pane.
*
* @param <T> The type of elements that this WindowAssigner can assign windows to.
* @param <W> The type of @code Window that this assigner assigns.
*/
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable
private static final long serialVersionUID = 1L;
/**
* Returns a @code Collection of windows that should be assigned to the element.
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
* @param context The @link WindowAssignerContext in which the assigner operates.
*/
public abstract Collection<W> assignWindows(
T element, long timestamp, WindowAssignerContext context);
/** Returns the default trigger associated with this @code WindowAssigner. */
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
/**
* Returns a @link TypeSerializer for serializing windows that are assigned by this @code
* WindowAssigner.
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
* Returns @code true if elements are assigned to windows based on event time, @code false
* otherwise.
*/
public abstract boolean isEventTime();
/**
* A context provided to the @link WindowAssigner that allows it to query the current
* processing time.
*
* <p>This is provided to the assigner by its containing @link
* org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, which, in turn, gets
* it from the containing @link org.apache.flink.streaming.runtime.tasks.StreamTask.
*/
public abstract static class WindowAssignerContext
/** Returns the current processing time. */
public abstract long getCurrentProcessingTime();
窗口算子需要传的参数WindowAssigner有如下实现类:
源码:
package org.apache.flink.streaming.api.datastream;
public class KeyedStream<T, KEY> extends DataStream<T>
// ------------------------------------------------------------------------
// Windowing
// ------------------------------------------------------------------------
/**
* Windows this @code KeyedStream into tumbling time windows.
*
* <p>This is a shortcut for either @code .window(TumblingEventTimeWindows.of(size)) or @code
* .window(TumblingProcessingTimeWindows.of(size)) depending on the time characteristic set
* using @link
* org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
*
* @param size The size of the window.
* @deprecated Please use @link #window(WindowAssigner) with either @link
* TumblingEventTimeWindows or @link TumblingProcessingTimeWindows. For more information,
* see the deprecation notice on @link TimeCharacteristic
*/
@Deprecated
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size)
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime)
return window(TumblingProcessingTimeWindows.of(size));
else
return window(TumblingEventTimeWindows.of(size));
/**
* Windows this @code KeyedStream into sliding time windows.
*
* <p>This is a shortcut for either @code .window(SlidingEventTimeWindows.of(size, slide)) or
* @code .window(SlidingProcessingTimeWindows.of(size, slide)) depending on the time
* characteristic set using @link
* org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
*
* @param size The size of the window.
* @deprecated Please use @link #window(WindowAssigner) with either @link
* SlidingEventTimeWindows or @link SlidingProcessingTimeWindows. For more information,
* see the deprecation notice on @link TimeCharacteristic
*/
@Deprecated
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide)
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime)
return window(SlidingProcessingTimeWindows.of(size, slide));
else
return window(SlidingEventTimeWindows.of(size, slide));
/**
* Windows this @code KeyedStream into tumbling count windows.
*
* @param size The size of the windows in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size)
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
/**
* Windows this @code KeyedStream into sliding count windows.
*
* @param size The size of the windows in number of elements.
* @param slide The slide interval in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide)
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
/**
* Windows this data stream to a @code WindowedStream, which evaluates windows over a key
* grouped stream. Elements are put into windows by a @link WindowAssigner. The grouping of
* elements is done both by key and by window.
*
* <p>A @link org.apache.flink.streaming.api.windowing.triggers.Trigger can be defined to
* specify when windows are evaluated. However, @code WindowAssigners have a default @code
* Trigger that is used if a @code Trigger is not specified.
*
* @param assigner The @code WindowAssigner that assigns elements to windows.
* @return The trigger windows data stream.
*/
@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(
WindowAssigner<? super T, W> assigner)
return new WindowedStream<>(this, assigner);
与Window有关的算子有已经过时的2种timeWindow时间窗口、countWindow计数窗口,以及一个window窗口分配器算子。
过时的时间窗口算子
以处理时间为例子。事件时间的情况与之类似。之后尽量使用新API,但不会深究老API与老的DataSet批处理。
滚动时间窗口
package com.zhiyong.flinkStream;
import com.zhiyong.flinkStudy.FlinkWordCountDemo2FlatMapFunction;
import com.zhiyong.flinkStudy.WordCountSource1ps;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;
/**
* @program: study
* @description: Flink窗口转换算子
* @author: zhiyong
* @create: 2022-03-23 23:59
**/
public class WindowDemo
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//过时方法,必须设置,才能正常使用算子,否则报错
DataStreamSource<String> data = env.addSource(new WordCountSource1ps());
SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = data.flatMap(new FlatMapFunction1())
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>()
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception
return value.f0;
)
.timeWindow(Time.seconds(30))
//过时的滚动时间窗口
.sum(1);
result1.print("过时的滚动时间窗口");
env.execute("streaming");
private static class FlatMapFunction1 implements FlatMapFunction<String,Tuple2<String,Integer>>
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception
for(String cell:value.split("\\\\s+"))
out.collect(Tuple2.of(cell,1));
由于自定义数据源不带时间戳,不使用上述过时方法设置env会报错:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool以上是关于Flink窗口转换算子的主要内容,如果未能解决你的问题,请参考以下文章