Flink单流算子
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink单流算子相关的知识,希望对你有一定的参考价值。
参考技术A 输入一个元素,输出一个元素,其中包含了变换逻辑继承了MapFunction,可以获取RuntimeContext,用于查询当前算子当前并发的运行状态、accumulator以及broadcast variables等。
输入一个元素,输出若干个元素(可以是0个),其中包含了变换逻辑
类似RichMapFunction
用于处理数据,out用于向下游发射数据,ctx则用于查询时间戳、注册TimerService等等,也可以获取state用于暂时存储数据
注册了TimerService后(同一个时间戳多次注册只会触发一次),当watermark没过这个时间戳时,就会触发事件,调用onTimer方法,可以执行一些逻辑,比如把统计的结果合并成一条记录,用out输出等等
只有Tuple才能这样操作,就是将原来的Tuple映射成新的Tuple,fieldIndexes表示原来的Tuple中的数据的索引,取出的数据按fieldIndexes的顺序,排成新的Tuple
表示以不同的方式获取不分key的AllWindowedStream
各种花式输出~
将一个流的一个window的数据聚合成一个数据,数据类型一致
输入两个数据,输出一个数据,其中包含了归并的逻辑,算子会不断重复,直至剩下一个元素。用户需要自己保证reduce方法的结果与元素的处理先后、组合方式无关。
将一个窗口的数据聚合成一条,与reduce类似,但是更灵活
创建一个累加器,用于保存状态,最好是增量的,可以节约存储,不用保存所有记录
增加一个元素
从accumulator中获得输出元素
用于合并accumulator,复用对象,调用这个方法后,之前的accumulator就不再用了
用于将AggregateFunction中合并得到的的OUT 数据通过out输出。这里的IN是AggregateFunction的OUT
与AllWindowFunction类似,但是多了可以使用context的功能
也使用ProcessAllWindowFunction,与aggregate不同的是,处理的是窗口中的每一个元素,而不是聚合后的元素
也使用ReduceFunction、AllWindowFunction,与reduce、aggregate不同的是,处理的是窗口中的每一个元素,而不是聚合后的元素
FoldFunction<O, T>
T fold(T accumulator, O value) throws Exception;
与reduce、aggregate类似,把每个数据都归并到一个accumulator中去,最后产生一个输出数据
一些预定义好的聚合方法,按字面意思
sideOutputLateData(OutputTag<T> outputTag)
将迟到的数据输出,outputTag是输出流的tag
可以通过SingleOutputStreamOperator#getSideOutput(OutputTag)来获得迟到数据的流
方法的作用与DataStream类似,表示以不同的方式获取分key的WindowedStream
方法的作用与AllWindowStream类似,只是作用于某个pane(也就是window中单独的key的数据)
Flink窗口转换算子
窗口转换算子预览
之前扒源码看到过Flink的窗口有很多种:
package org.apache.flink.streaming.api.windowing.assigners;
/**
* A @code WindowAssigner assigns zero or more @link Window Windows to an element.
*
* <p>In a window operation, elements are grouped by their key (if available) and by the windows to
* which it was assigned. The set of elements with the same key and window is called a pane. When a
* @link Trigger decides that a certain pane should fire the @link
* org.apache.flink.streaming.api.functions.windowing.WindowFunction is applied to produce output
* elements for that pane.
*
* @param <T> The type of elements that this WindowAssigner can assign windows to.
* @param <W> The type of @code Window that this assigner assigns.
*/
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable
private static final long serialVersionUID = 1L;
/**
* Returns a @code Collection of windows that should be assigned to the element.
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
* @param context The @link WindowAssignerContext in which the assigner operates.
*/
public abstract Collection<W> assignWindows(
T element, long timestamp, WindowAssignerContext context);
/** Returns the default trigger associated with this @code WindowAssigner. */
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
/**
* Returns a @link TypeSerializer for serializing windows that are assigned by this @code
* WindowAssigner.
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
* Returns @code true if elements are assigned to windows based on event time, @code false
* otherwise.
*/
public abstract boolean isEventTime();
/**
* A context provided to the @link WindowAssigner that allows it to query the current
* processing time.
*
* <p>This is provided to the assigner by its containing @link
* org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, which, in turn, gets
* it from the containing @link org.apache.flink.streaming.runtime.tasks.StreamTask.
*/
public abstract static class WindowAssignerContext
/** Returns the current processing time. */
public abstract long getCurrentProcessingTime();
窗口算子需要传的参数WindowAssigner有如下实现类:
源码:
package org.apache.flink.streaming.api.datastream;
public class KeyedStream<T, KEY> extends DataStream<T>
// ------------------------------------------------------------------------
// Windowing
// ------------------------------------------------------------------------
/**
* Windows this @code KeyedStream into tumbling time windows.
*
* <p>This is a shortcut for either @code .window(TumblingEventTimeWindows.of(size)) or @code
* .window(TumblingProcessingTimeWindows.of(size)) depending on the time characteristic set
* using @link
* org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
*
* @param size The size of the window.
* @deprecated Please use @link #window(WindowAssigner) with either @link
* TumblingEventTimeWindows or @link TumblingProcessingTimeWindows. For more information,
* see the deprecation notice on @link TimeCharacteristic
*/
@Deprecated
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size)
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime)
return window(TumblingProcessingTimeWindows.of(size));
else
return window(TumblingEventTimeWindows.of(size));
/**
* Windows this @code KeyedStream into sliding time windows.
*
* <p>This is a shortcut for either @code .window(SlidingEventTimeWindows.of(size, slide)) or
* @code .window(SlidingProcessingTimeWindows.of(size, slide)) depending on the time
* characteristic set using @link
* org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
*
* @param size The size of the window.
* @deprecated Please use @link #window(WindowAssigner) with either @link
* SlidingEventTimeWindows or @link SlidingProcessingTimeWindows. For more information,
* see the deprecation notice on @link TimeCharacteristic
*/
@Deprecated
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide)
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime)
return window(SlidingProcessingTimeWindows.of(size, slide));
else
return window(SlidingEventTimeWindows.of(size, slide));
/**
* Windows this @code KeyedStream into tumbling count windows.
*
* @param size The size of the windows in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size)
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
/**
* Windows this @code KeyedStream into sliding count windows.
*
* @param size The size of the windows in number of elements.
* @param slide The slide interval in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide)
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
/**
* Windows this data stream to a @code WindowedStream, which evaluates windows over a key
* grouped stream. Elements are put into windows by a @link WindowAssigner. The grouping of
* elements is done both by key and by window.
*
* <p>A @link org.apache.flink.streaming.api.windowing.triggers.Trigger can be defined to
* specify when windows are evaluated. However, @code WindowAssigners have a default @code
* Trigger that is used if a @code Trigger is not specified.
*
* @param assigner The @code WindowAssigner that assigns elements to windows.
* @return The trigger windows data stream.
*/
@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(
WindowAssigner<? super T, W> assigner)
return new WindowedStream<>(this, assigner);
与Window有关的算子有已经过时的2种timeWindow时间窗口、countWindow计数窗口,以及一个window窗口分配器算子。
过时的时间窗口算子
以处理时间为例子。事件时间的情况与之类似。之后尽量使用新API,但不会深究老API与老的DataSet批处理。
滚动时间窗口
package com.zhiyong.flinkStream;
import com.zhiyong.flinkStudy.FlinkWordCountDemo2FlatMapFunction;
import com.zhiyong.flinkStudy.WordCountSource1ps;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;
/**
* @program: study
* @description: Flink窗口转换算子
* @author: zhiyong
* @create: 2022-03-23 23:59
**/
public class WindowDemo
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//过时方法,必须设置,才能正常使用算子,否则报错
DataStreamSource<String> data = env.addSource(new WordCountSource1ps());
SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = data.flatMap(new FlatMapFunction1())
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>()
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception
return value.f0;
)
.timeWindow(Time.seconds(30))
//过时的滚动时间窗口
.sum(1);
result1.print("过时的滚动时间窗口");
env.execute("streaming");
private static class FlatMapFunction1 implements FlatMapFunction<String,Tuple2<String,Integer>>
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception
for(String cell:value.split("\\\\s+"))
out.collect(Tuple2.of(cell,1));
由于自定义数据源不带时间戳,不使用上述过时方法设置env会报错:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool以上是关于Flink单流算子的主要内容,如果未能解决你的问题,请参考以下文章