大数据(9e)Flink侧输出流
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9e)Flink侧输出流相关的知识,希望对你有一定的参考价值。
文章目录
概述
窗口允许迟到的数据,但仍有数据在关窗后到达
Flink提供了侧输出流(sideOutput)来处理关窗之后到达的数据
环境
WIN10+IDEA+JDK1.8+FLINK1.14
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
</dependencies>
OutputTag介绍
OutputTag是一种命名标记,用于标记算子中的侧输出
实现分流
ctx.output:向由OutputTag标识的侧输出发出记录
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//定义输出标签
OutputTag<Integer> o1 = new OutputTag<Integer>("除以3余1") ;
OutputTag<Integer> o2 = new OutputTag<Integer>("除以3余2") ;
//创建流
SingleOutputStreamOperator<Integer> d = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
//处理
SingleOutputStreamOperator<Integer> s = d.process(new ProcessFunction<Integer, Integer>()
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out)
//分流
if (value % 3 == 2)
ctx.output(o2, value); //ctx.output:向由OutputTag标识的侧输出发出记录
else if (value % 3 == 1)
ctx.output(o1, value); //ctx.output:向由OutputTag标识的侧输出发出记录
else
out.collect(value);
);
//输出
s.print("被3整除");
s.getSideOutput(o1).print(o1.getId());
s.getSideOutput(o2).print(o2.getId());
//环境执行
env.execute();
-
测试结果
-
被3整除> 0
除以3余1> 1
除以3余2> 2
被3整除> 3
除以3余1> 4
除以3余2> 5
被3整除> 6
除以3余1> 7
除以3余2> 8
被3整除> 9
处理迟到数据
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//定义测输出流的输出标签
OutputTag<String> outputTag = new OutputTag<String>("迟到标签") ;
//创建流,添加自定义数据源
SingleOutputStreamOperator<String> d = env.addSource(new SourceFunction<String>()
@Override
public void run(SourceContext<String> ctx)
//发送水位线
ctx.emitWatermark(new Watermark(1999L));
//发送2条数据,其中1条迟到
ctx.collectWithTimestamp("1998", 1998L);
ctx.collectWithTimestamp("2000", 2000L);
@Override
public void cancel()
);
//处理
SingleOutputStreamOperator<String> s = d.process(new ProcessFunction<String, String>()
@Override
public void processElement(String value, Context ctx, Collector<String> out)
//获取水位线
long watermark = ctx.timerService().currentWatermark();
//判断是否迟到
if (ctx.timestamp() > watermark)
//冇迟到
out.collect(value);
else
//迟到:向outputTag发送数据
ctx.output(outputTag, value);
);
//输出
s.print("主流输出");
s.getSideOutput(outputTag).print("侧输出");
//环境执行
env.execute();
-
发送1999水位线,然后发送两条数据,测试结果如下
-
侧输出> 1998
主流输出> 2000
处理关窗之后到达的数据
开窗后.sideOutputLateData(outputTag)
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
public class Hi
public static void main(String[] args) throws Exception
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//定义测输出流的输出标签
OutputTag<String> outputTag = new OutputTag<String>("迟到标签") ;
//创建流,添加自定义数据源
SingleOutputStreamOperator<String> d = env.addSource(new SourceFunction<String>()
@Override
public void run(SourceContext<String> ctx)
ctx.collectWithTimestamp("a", 4000L);
ctx.collectWithTimestamp("b", 5000L);
ctx.emitWatermark(new Watermark(5999L)); //发送水位线,触发【3000~5999】的窗口关闭
ctx.collectWithTimestamp("c", 5000L);
ctx.collectWithTimestamp("d", 5000L);
ctx.collectWithTimestamp("e", 6000L);
ctx.collectWithTimestamp("f", 7000L);
@Override
public void cancel()
);
//处理
SingleOutputStreamOperator<String> s = d
//事件时间滚动窗口
.windowAll(TumblingEventTimeWindows.of(Time.seconds(3L)))
//侧输出
.sideOutputLateData(outputTag)
//拼接字符串
.reduce((a, b) -> a + "," + b);
//输出
s.print("主流输出");
s.getSideOutput(outputTag).print("侧输出");
//环境执行
env.execute();
-
中途发送水位线,触发关窗,测试结果如下
-
主流输出> a,b
侧输出> c
侧输出> d
主流输出> e,f
以上是关于大数据(9e)Flink侧输出流的主要内容,如果未能解决你的问题,请参考以下文章