Flink广播状态和定时器实现word_count有效时间1分钟案例
Posted 猫猫爱吃小鱼粮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink广播状态和定时器实现word_count有效时间1分钟案例相关的知识,希望对你有一定的参考价值。
1、广播流传入id, name,非广播流传入id, count,有效时间1分钟,实现word_count;
2、代码实现
public class Demo09
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// id,count
DataStreamSource<String> main = env.socketTextStream("localhost", 8888);
// id,name
DataStreamSource<String> line = env.socketTextStream("localhost", 9999);
// 广播状态是一种特殊的operatorState,创建状态描述器
MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("rule", String.class, String.class);
// 创建广播状态
BroadcastStream<String> broadcastStream = line.broadcast(mapStateDescriptor);
//非广播流和广播流进行关联
main.keyBy(e->e.split(",")[0]).connect(broadcastStream)
.process(new KeyedBroadcastProcessFunction<String, String, String, Tuple2<String,Integer>>()
private MapState<String, Integer> countState;
@Override
public void onTimer(long timestamp, KeyedBroadcastProcessFunction<String, String, String, Tuple2<String, Integer>>.OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception
countState.clear();
// 处理非广播流
@Override
public void processElement(String value, KeyedBroadcastProcessFunction<String, String, String, Tuple2<String, Integer>>.ReadOnlyContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception
// 用于获取广播状态中的数据
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(mapStateDescriptor);
// 用于记录count的结果
countState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("count", String.class, Integer.class));
// 将广播流和非广播流进行匹配,处理非广播流中数据 id/count
String[] fields = value.split(",");
String id = fields[0];
Integer count = Integer.valueOf(fields[1]);
// 获取广播流中的数据 id/name
String name = state.get(id);
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis()+1000*60);
System.out.println(name+"=>有效时间1分钟");
// 读取历史数据中的 count
Integer oldCount = countState.get(name);
if(oldCount == null)
oldCount = 0;
count += oldCount;
countState.put(name,count);
out.collect(new Tuple2<String,Integer>(name,count));
// 处理广播流
@Override
public void processBroadcastElement(String value, KeyedBroadcastProcessFunction<String, String, String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception
// 和上面创建广播状态一样的状态描述器,获取状态
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 对状态进行处理
String[] fields = value.split(",");
// 将数据添加到广播状态中 id/name
broadcastState.put(fields[0],fields[1]);
).print();
env.execute("broadcast=>");
3、测试数据
- 输入数据
- 广播流输入:
- 1,a
- 2,b
- 3,c
- 非广播流输入:
- 1,1
- 2,5
- 3,6
- 1,2
- 输出结果:
- 4> (a,1)
- 2> (b,5)
- 3> (c,6)
- 4> (a,3)
以上是关于Flink广播状态和定时器实现word_count有效时间1分钟案例的主要内容,如果未能解决你的问题,请参考以下文章
“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是啥意思?
Flink-状态与容错-Broadcast State--flink1.13