Flink中实现自定义ProcessFunction实现定时器侧输出

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink中实现自定义ProcessFunction实现定时器侧输出相关的知识,希望对你有一定的参考价值。

在Flink中,当我们需要获取到算子的Processing Time或者Water Mark以及定时器时,可以实现ProcessFunction函数。
目前该函数主要有:K恶业的ProcessFuntion,ProcessFunction,CoPropcessFunction等,核心功能主要如下:

  • 可以使用状态计算,能够在算子中访问Keyed State
  • 可以设置定时器
  • 侧输出,可以将一部分数据发送到另外一个数据流中,而且输出的两个数据流数据类型可以不一样。

如下自定义实现一个KeyedProcessFunction:


public class MyKeyedProcessFunctionJava extends KeyedProcessFunction<String, StockPrice,String> 

    private ValueState<Long> currentTime;
    private ValueState<Double> lastPrice ;
    private static long  intervalMs = 500 ;

    OutputTag<StockPrice> highLevel ;
    OutputTag<StockPrice> middleLevel;
    OutputTag<StockPrice> lowLevel;

    public MyKeyedProcessFunctionJava(OutputTag<StockPrice> highLevel,
                                      OutputTag<StockPrice> middleLevel,
                                      OutputTag<StockPrice> lowLevel)
        this.lowLevel = lowLevel;
        this.middleLevel= middleLevel;
        this.highLevel = highLevel;

    
    @Override
    public void open(Configuration parameters) throws Exception 
        super.open(parameters);
        currentTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("currentTime",Long.class));
        lastPrice = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastPrice",Double.class));
    

    @Override
    public void processElement(StockPrice stockPrice, Context context, Collector<String> collector) throws Exception 
        double price = 0;
        if(lastPrice.value() != null) 
            price = lastPrice.value() ;
        
        long currentTimeStamp = 0;
        if(currentTime.value() != null)
            currentTimeStamp = currentTime.value();
        
        if(price < stockPrice.getPrice())
            context.timerService().deleteEventTimeTimer(currentTimeStamp);
        else
            long time = context.timestamp()+intervalMs;
            context.timerService().registerEventTimeTimer(time);
            currentTime.update(time);

        
        lastPrice.update(stockPrice.getPrice());
        if(price>10000)
            context.output(highLevel,stockPrice);
        else if(price<= 10000 && price > 3000)
            context.output(middleLevel,stockPrice);
        else
            context.output(lowLevel,stockPrice);
        


    

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception 
        SimpleDateFormat format = new SimpleDateFormat("yyy-MM-dd HH:mm:ss");
        String timeStr = format.format(timestamp);
        String warnings = String.format("warnings: time=%s ,key=%s increased",timeStr,ctx.getCurrentKey());
        out.collect(warnings);
    

    public static void main(String[] args) 
        String topic = "test001";
        Properties kafkaProps = new Properties();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FlinkKafkaConsumer<StockPrice> consumer = new FlinkKafkaConsumer<StockPrice>(topic, new MyKafkaDeserializationSchema(), kafkaProps);

        OutputTag<StockPrice> highLevel = new OutputTag<StockPrice>("highLevel");
        OutputTag<StockPrice> middleLevel = new OutputTag<StockPrice>("middleLevel");
        OutputTag<StockPrice> lowLevel = new OutputTag<StockPrice>("lowLevel");

        SingleOutputStreamOperator<String> warning = env.addSource(consumer)
                .keyBy(stockPrice -> stockPrice.getId())
        .process(new MyKeyedProcessFunctionJava(highLevel,middleLevel,lowLevel));

        DataStream<StockPrice> highLevelStream = warning.getSideOutput(highLevel);
        DataStream<StockPrice> middleLevelStream = warning.getSideOutput(middleLevel);
        DataStream<StockPrice> lowLevelStream = warning.getSideOutput(lowLevel);





    

以上是关于Flink中实现自定义ProcessFunction实现定时器侧输出的主要内容,如果未能解决你的问题,请参考以下文章

如何在Canvas中实现自定义路径动画

在具有条件的 keras 中实现自定义损失函数

为啥不推荐使用 JScript 在 WiX 中实现自定义操作?

在 Freemarker 中实现自定义 Escaper

如何在黄瓜中实现自定义监听器?

如何在自定义 Spring 存储库中实现自定义方法? [复制]