Flink BroadCastState实现事件流广播流 双流connect(java版本)

Posted zcx_bigdata

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink BroadCastState实现事件流广播流 双流connect(java版本)相关的知识,希望对你有一定的参考价值。

需求:

事件流(kafka中):userID,eventTime,eventType,productID

广播流(mysql中):userID,userName,userAge

1.根据广播流中的用户数据将事件流中的数据补全:userID,eventTime,eventType,productID,userName,userAge

2.修改广播流中的数据,新合并后的结果数据实时更新(事件流可以捕捉到广播流数据的变化)


实现方法:

1.flink消费kafka数据,用mapfunction处理数据时直接查询mysql中的数据进行补全,性能差,因为每次新到一条数据都要去mysql现查;

2.将广播流的数据放入redis中,用mapfunction处理数据时从redis中查询数据进行补全,性能还凑合,每次新到一条数据都要去redis中查;

3.在flink中实现双流join,但是如果对mysql中的数据进行更新了,该流(mysql所在的流)需要及时更新数据,效率差;

4.采用双流connect+broadCastState(广播流state),广播流会实时从mysql中读取最新数据,放入broadCastState中,事件流从broadCastState获取广播流中的数据,效率高。

核心代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092");
        FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx1", new SimpleStringSchema(), properties);

        DataStreamSource kafkadataStreamSource = env.addSource(kafka);
        SingleOutputStreamOperator kafkaDs = kafkadataStreamSource.process(new ProcessFunction<String, Operation>() 

            @Override
            public void processElement(String s, ProcessFunction<String, Operation>.Context context, Collector<Operation> collector) throws Exception 
                JSONObject jsonObject = JSON.parseObject(s);
                String userID = jsonObject.getString("userID");
                String eventTime = jsonObject.getString("eventTime");
                String eventType = jsonObject.getString("eventType");
                int productID = jsonObject.getIntValue("productID");
                collector.collect(new Operation(userID, eventTime, eventType, productID));

            
        );

        DataStreamSource<Map<String,UserInfo>> mysqlDs = env.addSource(new RichSourceFunction<Map<String,UserInfo>>() 
            boolean isRunning = true;
            Connection connection = null;
            PreparedStatement preparedStatement = null;

            @Override
            public void open(Configuration parameters) throws Exception 
                Class.forName("com.mysql.jdbc.Driver");
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
                preparedStatement = connection.prepareStatement("select * from user_info");
            

            @Override
            public void close() throws Exception 
                if (null != preparedStatement) 
                    preparedStatement.close();
                
                if (null != connection) 
                    connection.close();
                
            

            @Override
            public void run(SourceContext<Map<String,UserInfo>> sourceContext) throws Exception 
                while (isRunning) 
                    ResultSet resultSet = preparedStatement.executeQuery();
                    while (resultSet.next()) 
                        HashMap<String,UserInfo> hashMap=new HashMap<>();
                        hashMap.put(resultSet.getString("userID"),
                                new UserInfo(resultSet.getString("userName"),resultSet.getInt("userAge")));
                        sourceContext.collect(hashMap);
                    
                    Thread.sleep(10000);
                
            

            @Override
            public void cancel() 
                isRunning = false;
            
        );

//        注意:broadcaststate是MapStateDescriptor,map类型,所以需要將mysqlsource的数据处理成map类型
        BroadcastStream<Map<String, UserInfo>> broadcastStream = mysqlDs.broadcast(new MapStateDescriptor<String, UserInfo>("broadcastState", String.class, UserInfo.class));

        //双流connect
        BroadcastConnectedStream connect = kafkaDs.connect(broadcastStream);
        connect.process(new BroadcastProcessFunction<Operation,Map<String,UserInfo>,UserOperation>() 
            MapStateDescriptor broadCastStateDescriptor= new MapStateDescriptor<String, UserInfo>("broadcastState", String.class, UserInfo.class);
            @Override
            //从broadcastState获取广播流中的数据
            public void processElement(Operation operation, BroadcastProcessFunction<Operation, Map<String, UserInfo>, UserOperation>.ReadOnlyContext readOnlyContext, Collector<UserOperation> collector) throws Exception 
                ReadOnlyBroadcastState<String,UserInfo> broadcastState = readOnlyContext.getBroadcastState(broadCastStateDescriptor);
                if(broadcastState.contains(operation.userID))
                    UserInfo userInfo = broadcastState.get(operation.userID);
                    collector.collect(new UserOperation(operation.userID,operation.eventTime,operation.eventType,operation.productID,userInfo.userName,userInfo.userAge));
                
            

            @Override
            //将广播流中数据放入broadcastState
            public void processBroadcastElement(Map<String, UserInfo> stringUserInfoMap, BroadcastProcessFunction<Operation, Map<String, UserInfo>, UserOperation>.Context context, Collector<UserOperation> collector) throws Exception 
                BroadcastState<String, UserInfo> broadcastState = context.getBroadcastState(broadCastStateDescriptor);
                Iterator<String> iterator = stringUserInfoMap.keySet().iterator();
                if(iterator.hasNext())
                    String next = iterator.next();
                    broadcastState.put(next,stringUserInfoMap.get(next));
                
            
        ).print();

        env.execute();

测试

1.启动flink程序后,往kafka中写入数据:

"userID": "user_3", "eventTime": "2022-02-01 12:19:47", "eventType": "browse", "productID": 1

可以从console看到:

UserOperationuserID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=30

双流合并成功。

2.修改mysql中的数据,将user_3的userAge=100,再次向kafka中写入上述数据,可以看到:

UserOperationuserID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=100

合并后的数据中 涉及到原来广播流的数据是最新的。

以上是关于Flink BroadCastState实现事件流广播流 双流connect(java版本)的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新

从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新

19.BroadcastState-动态更新规则配置需求数据代码步骤代码实现

19.BroadcastState-动态更新规则配置需求数据代码步骤代码实现

Flink 使用 Broadcast State 的4个注意事项

Flink 使用 Broadcast State 的4个注意事项