Apache Flink——侧输出流(side output)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink——侧输出流(side output)相关的知识,希望对你有一定的参考价值。
参考技术A flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;flink中的侧输出,就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据;
简单理解就是,根据业务上的一定规则,将一个源中的数据拆分成不同的流,即主流和侧输出流。
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
除了从DataStream操作输出主结果流外,也可以生成任一数量的额外的侧输出流。结果流可以和主输出流的类型可以不匹配,并且侧输出流可以有不同类型。侧输出流的操作当你分流时非常有用,之前你需要先复制一个流再过滤出来,有了侧输出流,就不需要这样操作。
具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文
的.output()方法就可以了。
当使用侧输出流时,首先需要定义一个OutputTag,它将要被用来确定一个侧输出流。
如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。
可以从以下方法中来把数据输出到侧输出流
在以上的函数中可以用参数Context来暴露给用户发送数据到侧输出流。下面例子是用ProcessFunction来发送数据到侧输出流。
参考:
https://blog.csdn.net/rustwei/article/details/121102439
Flink 侧输出流 SideOutput
大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 sideoutput 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。processfunction 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。
下面的代码演示了低于32F的温度信息进入到测输出流"freezing alert"中。
object SideOutputTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val socketStream = env.socketTextStream("hadoop102", 7777) val dataStream: DataStream[SensorReading] = socketStream.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000 }) //低温报警处理 val processStream = dataStream.process(new FreezingAlert) //打印主输出流 processStream.print("process stream") //打印侧输出流。先得到某个测输出流。 processStream.getSideOutput(new OutputTag[String]("freezing alert")).print("freezing alert") env.execute("window test") } } class FreezingAlert extends ProcessFunction[SensorReading, SensorReading] { lazy val tag = new OutputTag[String]("freezing alert") override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = { if (value.temperature<32){ //侧输出流 ctx.output(tag,"freezing alert for " + value.temperature) }else{ //主输出流 collector.collect(value) } } }
端口数据
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30 sensor_1, 1547718200, 25 sensor_1, 1547718200, 35
控制台打印
freezing alert> freezing alert for 30.0 freezing alert> freezing alert for 25.0 process stream> SensorReading(sensor_1,1547718200,35.0)
以上是关于Apache Flink——侧输出流(side output)的主要内容,如果未能解决你的问题,请参考以下文章
Flink DataStream 侧输出流 Side Output
Flink 窗口迟到很久的消息处理/side outputs 旁路输出/防止数据丢失