14-flink-1.10.1-flink ProcessFunction API
Posted 逃跑的沙丁鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了14-flink-1.10.1-flink ProcessFunction API相关的知识,希望对你有一定的参考价值。
1 ProcessFunction API
我们之前学的转换算子是无法获取访问时间的时间搓信息和水位线信息的。而这在一些应用场景下,极为总要,例如MapFunction这样的map转换算子就无法访问时间戳或者当前时间的事件时间。
基于此,DataStream API 提供了一些列的Low-level转换算子。可以访问时间戳,watermark以及注册定时事件。还可以输出特定的一些事件。例如超时时间等。process function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,flink sql 就是使用process function实现的
flink 提供了8个process function:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
2 KeyedProcessFunction
这里我们重点介绍KeyedProcessFunction。
KeyedProcessFunction用来操作KeyedStream .KeyedprocessFunction会处理流的每一个元素。输出为0个,1个或者多个元素。所有的Process Function 都继承自RichFunction接口,所以都有open() close(),getRunctionContext()等方法。而,KeyedProcessFunction[KEY,In,OUT]还额外提供两个方法:
processElement(v:In,ctx:Context,out:Collector[OUT])。六中的每一个元素,都会调用这个方法,调用结果将会放在Collector数据类型中输出,context
可以访问元素的时间戳,元素的key,以及timerService时间服务。Context还可以将结果输出到别的流(side outputs)
onTimer(timestamp:Long,ctx:OnTimerContext,out:Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用,参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合,Ontimercontex和processElement的Context参数一样,提供了上线文的一些信息,例如,定时器触发的时间信息(时间时间或者处理时间)
3 定时器TimerService
4 代码整体演示
package com.study.liucf.unbounded.process
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import com.study.liucf.bean.{LiucfSensorReding, LiucfSensorRedingAgg}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* @Author liucf
* @Date 2021/10/13
* process api 这段代码没有测试执行,只讲解使用过程
*/
object LiucfProcessTest {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
/**创建flink流式处理环境*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
/**flink 从外部命令获取参数*/
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
/**监听一个流式输入端口*/
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
/**数据流转换*/
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
// .process(new LiucfProcessFunction("这个函数是没有被实现过的")) //如果基于当前流进行process处理直接在这里调用就就可以了
.keyBy(_.id)//按照传感器id分组,然后后面就可以是用keyedprocess类
.process(new LiucfKeyedProcessFunction())// 参数可以使用KeyedProcessFunction
/**输出结果,打印到控制台*/
transRes.print("result")
/**启动流式处理*/
env.execute(" liucf process test")
}
}
/**自定义processfunction
* 继承
* KeyedProcessFunction[String,LiucfSensorReding,(String,Double)]
* KeyedProcessFunction[key的类型,输入数据类型,输出的数据类型]
*
*/
class LiucfKeyedProcessFunction extends KeyedProcessFunction[String,LiucfSensorReding,(String,Double)]{
var myvalueState:ValueState[Int] = _
/**
* 生命周期函数
* 为什么还有生命周期函数呢?
* 因为这是继承来的,
* LiucfKeyedProcessFunction 继承 KeyedProcessFunction
* KeyedProcessFunction 继承 AbstractRichFunction
* AbstractRichFunction 继承 RichFunction
* RichFunction 有生命周期函数
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
myvalueState= getRuntimeContext.getState(new ValueStateDescriptor[Int]("myvalueState",classOf[Int]))
}
/**
* 每条数据来的时候都会调用这个方法
*
* @param value The input value.
* @param ctx A KeyedProcessFunction.Context that allows querying the timestamp of the element and getting a TimerService for registering timers and querying the time. The context is only valid during the invocation of this method, do not store it.
* @param out The collector for returning result values.
*/
override def processElement(value: LiucfSensorReding,
ctx: KeyedProcessFunction[String, LiucfSensorReding, (String, Double)]#Context,
out: Collector[(String, Double)]
): Unit = {
ctx.getCurrentKey //Get key of the element being processed.
ctx.timestamp()//Timestamp of the element currently being processed or timestamp of a firing timer.
ctx.timerService().currentWatermark()//Returns the current event-time watermark.
/**
* 注册一个定时器,启东时间是当前时间戳之后1分钟启动
* 当时间到了怎么处理数据呢?调用什么吗?
* 调用def onTimer()方法来实现想处理的过程
* 这里可以定义多个定时器,是根据定时器的时间戳来区分不同的定时器的。
* 注意即使定义了多个定时器都是到点后出发onTime()方法,不区分操作的方法。
* 如果想区分操作则需要再onTime() 方法内部根据时间进行判断
* 定时器还可以删除
*
* */
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+60000L)//
}
/**
* 当定时器时间戳到点的时候,就行闹钟时间到了的时候触发这个方法
* @param timestamp 触发定时器的时间戳,什么时候触发,这个时间戳就是什么时候的时间戳
* @param ctx 上下文
* @param out 输出数据
*/
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, LiucfSensorReding, (String, Double)]#OnTimerContext,
out: Collector[(String, Double)]): Unit = {
}
}
5 KeyedProcessFunction-实例
需求:如果同一个采集器联系采集到的十条数据都在持续上升那么就进行告警
package com.study.liucf.unbounded.process
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* @Author liucf
* @Date 2021/10/13
* 需求:如果同一个采集器联系采集到的十条数据都在持续上升那么就进行告警
* 分析:不可以使用窗口进行划分处理,因为夸窗口的连续上升无法监控到,所以需要使用定时器
*/
object LiucfKedProcessTest {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
/**创建flink流式处理环境*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
/**flink 从外部命令获取参数*/
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
/**监听一个流式输入端口*/
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
/**数据流转换*/
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy(_.id)
.process(new LiucfKedProcessForTempIncreWraning(10000L))//处理连续上升告警
/**输出结果,打印到控制台*/
transRes.print("result")
/**启动流式处理*/
env.execute(" liucf process test")
}
}
/**自定义KeyedProcessFunction
* 继承
* KeyedProcessFunction[String,LiucfSensorReding,String]
* KeyedProcessFunction[key的类型,输入数据类型,输出的数据类型]
* 处理告警的策略,就是保存好上一条采集到的数据的温度值状态,然后和新采集温度信息比较
* 如果温度新的温度是上升到就定义一个新的定时器,如果温度下降了就清除定时器就可以了,这样interval内没有被清除的定时器到点就会告警出来
* @param interval 连续多长时间内
*
*/
class LiucfKedProcessForTempIncreWraning(interval:Long) extends KeyedProcessFunction[String,LiucfSensorReding,String]{
/**定义状态保存上一次温度值,再定义一个状态保存上一次定义的定时器的时间戳,用于删除定时器*/
lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempState",classOf[Double]))
lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTsState",classOf[Long]))
override def processElement(value: LiucfSensorReding,
ctx: KeyedProcessFunction[String, LiucfSensorReding, String]#Context,
out: Collector[String]): Unit = {
println("开始processElement。。。")
/**
* 取出上一次的温度值,和定时器时间差
* 定时器时间戳没赋值时是0是表示1970-01-01,也就是当第一条采集的温度数据到达时,当前时间戳状态没有赋值过取默认值0
* 这个可以用来判断是不是第一条数据
*/
val lastTempValue=lastTempState.value() //取出上一次的温度值
val timerTsValue = timerTsState.value()
println("lastTempValue="+lastTempValue+" timerTsValue="+timerTsValue)
//把新采集到的温度值更新到状态里用于下一次再来一条温度数据时比较的时候用
lastTempState.update(value.temperature)
//当前采集到的温度数据和上一次记录的温度数据状态进行比较
if(value.temperature>lastTempValue && timerTsValue==0){
//如果温度上升,且没有定时器的时候。注册当前新采集到的温度数据到达时候的时间戳之后interval秒的定时器,
// 这里简单的使用flink process time 来处理
val ts = ctx.timerService().currentProcessingTime() + interval //定时器时间戳
//注册定时器
ctx.timerService().registerProcessingTimeTimer(ts)
//吧当前定时的定时时间戳保存到值状态里
timerTsState.update(ts)
println("开始定时。。。")
} else if(value.temperature<lastTempValue){
//如果温度较上一次采集的数据温度下降了,那么删除和清空定时器时间戳状态
//这样操作后,一下一次温度上升的时候又能满足if(value.temperature>lastTempValue && timerTsValue==0)里的条件了
ctx.timerService().deleteProcessingTimeTimer(timerTsValue)
timerTsState.clear()
println("温度开始下降...")
}
}
/**
*
* @param timestamp 触发器时间戳,
* @param ctx 上下文环境
* @param out 输出告警信息
*/
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, LiucfSensorReding, String]#OnTimerContext,
out: Collector[String]): Unit = {
out.collect("传感器:"+ctx.getCurrentKey+" 在 "+timestamp +" 后连续 "+interval/1000 +"秒采集到的温度值连续升高")
//告警触发后定时器不用删除了,但是告警时间戳状态需要清除,方面下一次重头赋值状态。
timerTsState.clear()
}
}
6 processfunction-自定义侧输出流
需求:把采集到的温度分为主流(高温流)和侧边流(低温流)分别进行输出
package com.study.liucf.unbounded.process
import org.apache.flink.streaming.api.functions.ProcessFunction
import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* @Author liucf
* @Date 2021/10/13
* processfunction实现侧输出流
* 已经提供的split,select已经弃用了,推荐使用processfunction来实现侧输出流
*/
object LiucfProcessSideOutputStreamTest {
val EXAMPLE_SIZE=5L
def main(args: Array[String]): Unit = {
/**创建flink流式处理环境*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为从此环境创建的所有流设置时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
/**flink 从外部命令获取参数*/
val paramTool = ParameterTool.fromArgs(args)
val ip = paramTool.get("ip")
val port = paramTool.getInt("port")
/**监听一个流式输入端口*/
// val sockerInput: DataStream[String] = env.socketTextStream("192.168.109.151", 9999)
val sockerInput: DataStream[String] = env.socketTextStream(ip, port)
/**数据流转换*/
val transRes = sockerInput
.map(d=>{
val arr = d.split(",")
LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
})
val highTempDateStream = transRes
.process(new LiucfSideOutputStream(10))
//输出主流
highTempDateStream.print("high")
//输出侧边流
highTempDateStream
.getSideOutput(
new OutputTag[(String,Long,Double)]("low")
).print("low-Temperature")
/**输出结果,打印到控制台*/
// transRes.print("result")
/**启动流式处理*/
env.execute(" liucf process test")
}
}
/**
* 自定义侧输出流,继承ProcessFunction
* ProcessFunction[LiucfSensorReding,LiucfSensorReding]:
* ProcessFunction[输入数据类型,输出的主流的类型]
* @param threshold 判断标准
*/
class LiucfSideOutputStream(threshold:Long) extends ProcessFunction[LiucfSensorReding,LiucfSensorReding]{
/**
* 每个数据到达的时候都要经过这个方法的处理
* @param value 输入流 数据类型
* @param ctx flink 上下文
* @param out 输出流 数据类型(主流的数据类型)
*/
override def processElement(value: LiucfSensorReding,
ctx: ProcessFunction[LiucfSensorReding, LiucfSensorReding]#Context,
out: Collector[LiucfSensorReding]): Unit = {
if(value.temperature<threshold){//如果小于标准温度认为是低温,划分到侧输出流里面
ctx.output(new OutputTag[(String,Long,Double)]("low"), //侧边流名称
(value.id,value.timestamp,value.temperature) //侧边流数据结构
)
}else{//或者不进入低温流,划分到主流里面
out.collect(value)
}
}
}
以上是关于14-flink-1.10.1-flink ProcessFunction API的主要内容,如果未能解决你的问题,请参考以下文章