Flink窗口转换算子

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink窗口转换算子相关的知识,希望对你有一定的参考价值。

窗口转换算子预览

之前扒源码看到过Flink的窗口有很多种:

package org.apache.flink.streaming.api.windowing.assigners;
/**
 * A @code WindowAssigner assigns zero or more @link Window Windows to an element.
 *
 * <p>In a window operation, elements are grouped by their key (if available) and by the windows to
 * which it was assigned. The set of elements with the same key and window is called a pane. When a
 * @link Trigger decides that a certain pane should fire the @link
 * org.apache.flink.streaming.api.functions.windowing.WindowFunction is applied to produce output
 * elements for that pane.
 *
 * @param <T> The type of elements that this WindowAssigner can assign windows to.
 * @param <W> The type of @code Window that this assigner assigns.
 */
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable 
    private static final long serialVersionUID = 1L;

    /**
     * Returns a @code Collection of windows that should be assigned to the element.
     *
     * @param element The element to which windows should be assigned.
     * @param timestamp The timestamp of the element.
     * @param context The @link WindowAssignerContext in which the assigner operates.
     */
    public abstract Collection<W> assignWindows(
            T element, long timestamp, WindowAssignerContext context);

    /** Returns the default trigger associated with this @code WindowAssigner. */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

    /**
     * Returns a @link TypeSerializer for serializing windows that are assigned by this @code
     * WindowAssigner.
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

    /**
     * Returns @code true if elements are assigned to windows based on event time, @code false
     * otherwise.
     */
    public abstract boolean isEventTime();

    /**
     * A context provided to the @link WindowAssigner that allows it to query the current
     * processing time.
     *
     * <p>This is provided to the assigner by its containing @link
     * org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, which, in turn, gets
     * it from the containing @link org.apache.flink.streaming.runtime.tasks.StreamTask.
     */
    public abstract static class WindowAssignerContext 

        /** Returns the current processing time. */
        public abstract long getCurrentProcessingTime();
    

窗口算子需要传的参数WindowAssigner有如下实现类:

源码:

package org.apache.flink.streaming.api.datastream;
public class KeyedStream<T, KEY> extends DataStream<T> 
        // ------------------------------------------------------------------------
    //  Windowing
    // ------------------------------------------------------------------------

    /**
     * Windows this @code KeyedStream into tumbling time windows.
     *
     * <p>This is a shortcut for either @code .window(TumblingEventTimeWindows.of(size)) or @code
     * .window(TumblingProcessingTimeWindows.of(size)) depending on the time characteristic set
     * using @link
     * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
     *
     * @param size The size of the window.
     * @deprecated Please use @link #window(WindowAssigner) with either @link
     *     TumblingEventTimeWindows or @link TumblingProcessingTimeWindows. For more information,
     *     see the deprecation notice on @link TimeCharacteristic
     */
    @Deprecated
    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) 
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) 
            return window(TumblingProcessingTimeWindows.of(size));
         else 
            return window(TumblingEventTimeWindows.of(size));
        
    

    /**
     * Windows this @code KeyedStream into sliding time windows.
     *
     * <p>This is a shortcut for either @code .window(SlidingEventTimeWindows.of(size, slide)) or
     * @code .window(SlidingProcessingTimeWindows.of(size, slide)) depending on the time
     * characteristic set using @link
     * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
     *
     * @param size The size of the window.
     * @deprecated Please use @link #window(WindowAssigner) with either @link
     *     SlidingEventTimeWindows or @link SlidingProcessingTimeWindows. For more information,
     *     see the deprecation notice on @link TimeCharacteristic
     */
    @Deprecated
    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) 
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) 
            return window(SlidingProcessingTimeWindows.of(size, slide));
         else 
            return window(SlidingEventTimeWindows.of(size, slide));
        
    

    /**
     * Windows this @code KeyedStream into tumbling count windows.
     *
     * @param size The size of the windows in number of elements.
     */
    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) 
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    

    /**
     * Windows this @code KeyedStream into sliding count windows.
     *
     * @param size The size of the windows in number of elements.
     * @param slide The slide interval in number of elements.
     */
    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) 
        return window(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    

    /**
     * Windows this data stream to a @code WindowedStream, which evaluates windows over a key
     * grouped stream. Elements are put into windows by a @link WindowAssigner. The grouping of
     * elements is done both by key and by window.
     *
     * <p>A @link org.apache.flink.streaming.api.windowing.triggers.Trigger can be defined to
     * specify when windows are evaluated. However, @code WindowAssigners have a default @code
     * Trigger that is used if a @code Trigger is not specified.
     *
     * @param assigner The @code WindowAssigner that assigns elements to windows.
     * @return The trigger windows data stream.
     */
    @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(
            WindowAssigner<? super T, W> assigner) 
        return new WindowedStream<>(this, assigner);
    

与Window有关的算子有已经过时的2种timeWindow时间窗口、countWindow计数窗口,以及一个window窗口分配器算子。

过时的时间窗口算子

以处理时间为例子。事件时间的情况与之类似。之后尽量使用新API,但不会深究老API与老的DataSet批处理。

滚动时间窗口

package com.zhiyong.flinkStream;

import com.zhiyong.flinkStudy.FlinkWordCountDemo2FlatMapFunction;
import com.zhiyong.flinkStudy.WordCountSource1ps;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

/**
 * @program: study
 * @description: Flink窗口转换算子
 * @author: zhiyong
 * @create: 2022-03-23 23:59
 **/
public class WindowDemo 
    public static void main(String[] args) throws Exception
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //过时方法,必须设置,才能正常使用算子,否则报错

        DataStreamSource<String> data = env.addSource(new WordCountSource1ps());

        SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = data.flatMap(new FlatMapFunction1())
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() 
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception 
                        return value.f0;
                    
                )
                .timeWindow(Time.seconds(30))
                //过时的滚动时间窗口
                .sum(1);

        result1.print("过时的滚动时间窗口");

        env.execute("streaming");

    

    private static class FlatMapFunction1 implements FlatMapFunction<String,Tuple2<String,Integer>>

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception 
            for(String cell:value.split("\\\\s+"))
                out.collect(Tuple2.of(cell,1));
            
        
    


由于自定义数据源不带时间戳,不使用上述过时方法设置env会报错:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
	at akka.dispatch.OnComplete.internal(Future.scala:300)
	at akka.dispatch.OnComplete.internal(Future.scala:297)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool以上是关于Flink窗口转换算子的主要内容,如果未能解决你的问题,请参考以下文章

FLINK基础(92): DS算子与窗口单流算子KeyBy

flink window窗口算子

Flink 窗口算子

Flink流处理之窗口算子分析

08-flink-1.10.1- flink Transform api 转换算子

08-flink-1.10.1- flink Transform api 转换算子