Flink BroadCastState实现事件流广播流 双流connect(java版本)
Posted zcx_bigdata
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink BroadCastState实现事件流广播流 双流connect(java版本)相关的知识,希望对你有一定的参考价值。
需求:
事件流(kafka中):userID,eventTime,eventType,productID
广播流(mysql中):userID,userName,userAge
1.根据广播流中的用户数据将事件流中的数据补全:userID,eventTime,eventType,productID,userName,userAge
2.修改广播流中的数据,新合并后的结果数据实时更新(事件流可以捕捉到广播流数据的变化)
实现方法:
1.flink消费kafka数据,用mapfunction处理数据时直接查询mysql中的数据进行补全,性能差,因为每次新到一条数据都要去mysql现查;
2.将广播流的数据放入redis中,用mapfunction处理数据时从redis中查询数据进行补全,性能还凑合,每次新到一条数据都要去redis中查;
3.在flink中实现双流join,但是如果对mysql中的数据进行更新了,该流(mysql所在的流)需要及时更新数据,效率差;
4.采用双流connect+broadCastState(广播流state),广播流会实时从mysql中读取最新数据,放入broadCastState中,事件流从broadCastState获取广播流中的数据,效率高。
核心代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092");
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx1", new SimpleStringSchema(), properties);
DataStreamSource kafkadataStreamSource = env.addSource(kafka);
SingleOutputStreamOperator kafkaDs = kafkadataStreamSource.process(new ProcessFunction<String, Operation>()
@Override
public void processElement(String s, ProcessFunction<String, Operation>.Context context, Collector<Operation> collector) throws Exception
JSONObject jsonObject = JSON.parseObject(s);
String userID = jsonObject.getString("userID");
String eventTime = jsonObject.getString("eventTime");
String eventType = jsonObject.getString("eventType");
int productID = jsonObject.getIntValue("productID");
collector.collect(new Operation(userID, eventTime, eventType, productID));
);
DataStreamSource<Map<String,UserInfo>> mysqlDs = env.addSource(new RichSourceFunction<Map<String,UserInfo>>()
boolean isRunning = true;
Connection connection = null;
PreparedStatement preparedStatement = null;
@Override
public void open(Configuration parameters) throws Exception
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
preparedStatement = connection.prepareStatement("select * from user_info");
@Override
public void close() throws Exception
if (null != preparedStatement)
preparedStatement.close();
if (null != connection)
connection.close();
@Override
public void run(SourceContext<Map<String,UserInfo>> sourceContext) throws Exception
while (isRunning)
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next())
HashMap<String,UserInfo> hashMap=new HashMap<>();
hashMap.put(resultSet.getString("userID"),
new UserInfo(resultSet.getString("userName"),resultSet.getInt("userAge")));
sourceContext.collect(hashMap);
Thread.sleep(10000);
@Override
public void cancel()
isRunning = false;
);
// 注意:broadcaststate是MapStateDescriptor,map类型,所以需要將mysqlsource的数据处理成map类型
BroadcastStream<Map<String, UserInfo>> broadcastStream = mysqlDs.broadcast(new MapStateDescriptor<String, UserInfo>("broadcastState", String.class, UserInfo.class));
//双流connect
BroadcastConnectedStream connect = kafkaDs.connect(broadcastStream);
connect.process(new BroadcastProcessFunction<Operation,Map<String,UserInfo>,UserOperation>()
MapStateDescriptor broadCastStateDescriptor= new MapStateDescriptor<String, UserInfo>("broadcastState", String.class, UserInfo.class);
@Override
//从broadcastState获取广播流中的数据
public void processElement(Operation operation, BroadcastProcessFunction<Operation, Map<String, UserInfo>, UserOperation>.ReadOnlyContext readOnlyContext, Collector<UserOperation> collector) throws Exception
ReadOnlyBroadcastState<String,UserInfo> broadcastState = readOnlyContext.getBroadcastState(broadCastStateDescriptor);
if(broadcastState.contains(operation.userID))
UserInfo userInfo = broadcastState.get(operation.userID);
collector.collect(new UserOperation(operation.userID,operation.eventTime,operation.eventType,operation.productID,userInfo.userName,userInfo.userAge));
@Override
//将广播流中数据放入broadcastState
public void processBroadcastElement(Map<String, UserInfo> stringUserInfoMap, BroadcastProcessFunction<Operation, Map<String, UserInfo>, UserOperation>.Context context, Collector<UserOperation> collector) throws Exception
BroadcastState<String, UserInfo> broadcastState = context.getBroadcastState(broadCastStateDescriptor);
Iterator<String> iterator = stringUserInfoMap.keySet().iterator();
if(iterator.hasNext())
String next = iterator.next();
broadcastState.put(next,stringUserInfoMap.get(next));
).print();
env.execute();
测试
1.启动flink程序后,往kafka中写入数据:
"userID": "user_3", "eventTime": "2022-02-01 12:19:47", "eventType": "browse", "productID": 1
可以从console看到:
UserOperationuserID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=30
双流合并成功。
2.修改mysql中的数据,将user_3的userAge=100,再次向kafka中写入上述数据,可以看到:
UserOperationuserID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=100
合并后的数据中 涉及到原来广播流的数据是最新的。
以上是关于Flink BroadCastState实现事件流广播流 双流connect(java版本)的主要内容,如果未能解决你的问题,请参考以下文章
从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新
从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新
19.BroadcastState-动态更新规则配置需求数据代码步骤代码实现
19.BroadcastState-动态更新规则配置需求数据代码步骤代码实现