Flink广播状态和定时器实现word_count有效时间1分钟案例

Posted 猫猫爱吃小鱼粮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink广播状态和定时器实现word_count有效时间1分钟案例相关的知识,希望对你有一定的参考价值。

1、广播流传入id, name,非广播流传入id, count,有效时间1分钟,实现word_count;

2、代码实现

public class Demo09 
    public static void main(String[] args) throws Exception 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // id,count
        DataStreamSource<String> main = env.socketTextStream("localhost", 8888);
        // id,name
        DataStreamSource<String> line = env.socketTextStream("localhost", 9999);

        // 广播状态是一种特殊的operatorState,创建状态描述器
        MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("rule", String.class, String.class);
        // 创建广播状态
        BroadcastStream<String> broadcastStream = line.broadcast(mapStateDescriptor);

        //非广播流和广播流进行关联
        main.keyBy(e->e.split(",")[0]).connect(broadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, String, String, Tuple2<String,Integer>>() 
                    private MapState<String, Integer> countState;

                    @Override
                    public void onTimer(long timestamp, KeyedBroadcastProcessFunction<String, String, String, Tuple2<String, Integer>>.OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception 
                        countState.clear();
                    

                    // 处理非广播流
                    @Override
                    public void processElement(String value, KeyedBroadcastProcessFunction<String, String, String, Tuple2<String, Integer>>.ReadOnlyContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception 
                        // 用于获取广播状态中的数据
                        ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(mapStateDescriptor);
                        // 用于记录count的结果
                        countState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("count", String.class, Integer.class));
                        // 将广播流和非广播流进行匹配,处理非广播流中数据 id/count
                        String[] fields = value.split(",");
                        String id = fields[0];
                        Integer count = Integer.valueOf(fields[1]);
                        // 获取广播流中的数据 id/name
                        String name = state.get(id);
                        ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis()+1000*60);
                        System.out.println(name+"=>有效时间1分钟");
                        // 读取历史数据中的 count
                        Integer oldCount = countState.get(name);
                        if(oldCount == null)
                        
                            oldCount = 0;
                        
                        count += oldCount;
                        countState.put(name,count);
                        out.collect(new Tuple2<String,Integer>(name,count));
                    

                    // 处理广播流
                    @Override
                    public void processBroadcastElement(String value, KeyedBroadcastProcessFunction<String, String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception 
                        // 和上面创建广播状态一样的状态描述器,获取状态
                        BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
                        // 对状态进行处理
                        String[] fields = value.split(",");
                        // 将数据添加到广播状态中 id/name
                        broadcastState.put(fields[0],fields[1]);

                    
                ).print();

        env.execute("broadcast=>");
    

3、测试数据

  • 输入数据
  • 广播流输入:
  • 1,a
  • 2,b
  • 3,c
  • 非广播流输入:
  • 1,1
  • 2,5
  • 3,6
  • 1,2
  • 输出结果:
  • 4> (a,1)
  • 2> (b,5)
  • 3> (c,6)
  • 4> (a,3)

以上是关于Flink广播状态和定时器实现word_count有效时间1分钟案例的主要内容,如果未能解决你的问题,请参考以下文章

“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是啥意思?

Flink-状态与容错-Broadcast State--flink1.13

Apache Flink-在不使用广播状态的情况下更新操作员中的配置

Flink 的广播状态行为

Flink Broadcast State 实战指南

Flink Broadcast State 实战指南