19.BroadcastState-动态更新规则配置需求数据代码步骤代码实现
Posted to.to
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了19.BroadcastState-动态更新规则配置需求数据代码步骤代码实现相关的知识,希望对你有一定的参考价值。
本文来自:Flink1.12-2021黑马程序员贺岁视频 的学习笔记
19.BroadcastState-动态更新规则配置
19.1.需求
19.2.数据
19.3.代码步骤
19.4.代码实现
19.BroadcastState-动态更新规则配置-看懂
19.1.需求
在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有task时,就可以使用Broadcast State。Broadcast State是Flink 1.5引入的新特性。
下游的task接收这些配置、规则并保存为BroadcastState,将这些配置应用到另一个数据流的计算中。
- 场景举例
1)动态更新计算规则:如事件流需要根据最新的规则进行计算,则可以将规则作为广播状态广播到下游Task中。
实时增加额外字段:如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。
2)实时增加额外字段:如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为广播状态广播到下游Task中。
有一个事件流–用户的行为日志,里面有用户id,但是没有用户的详细信息
有一个配置流/规则流–用户信息流–里面有用户的详细的信息
现在要将事件流和配置流进行关联, 得出日志中用户的详细信息,如 (用户id,详细信息, 操作)
那么我们可以将配置流/规则流–用户信息流 作为状态进行广播 (因为配置流/规则流–用户信息流较小)
19.2.数据
/**
* 随机事件流--数据量较大
* 用户id,时间,类型,产品id
* <userID, eventTime, eventType, productID>
*/
public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>>
private boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning)
int id = random.nextInt(4) + 1;
String user_id = "user_" + id;
String eventTime = df.format(new Date());
String eventType = "type_" + random.nextInt(3);
int productId = random.nextInt(4);
ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
Thread.sleep(500);
@Override
public void cancel()
isRunning = false;
/**
* 配置流/规则流/用户信息流--数量较小
* <用户id,<姓名,年龄>>
*/
/*
CREATE TABLE `user_info` (
`userID` varchar(20) NOT NULL,
`userName` varchar(10) DEFAULT NULL,
`userAge` int(11) DEFAULT NULL,
PRIMARY KEY (`userID`) USING BTREE
) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
INSERT INTO `user_info` VALUES ('user_1', '张三', 10);
INSERT INTO `user_info` VALUES ('user_2', '李四', 20);
INSERT INTO `user_info` VALUES ('user_3', '王五', 30);
INSERT INTO `user_info` VALUES ('user_4', '赵六', 40);
*/
public static class mysqlSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>>
private boolean flag = true;
private Connection conn = null;
private PreparedStatement ps = null;
private ResultSet rs = null;
@Override
public void open(Configuration parameters) throws Exception
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
String sql = "select `userID`, `userName`, `userAge` from `user_info`";
ps = conn.prepareStatement(sql);
@Override
public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception
while (flag)
Map<String, Tuple2<String, Integer>> map = new HashMap<>();
ResultSet rs = ps.executeQuery();
while (rs.next())
String userID = rs.getString("userID");
String userName = rs.getString("userName");
int userAge = rs.getInt("userAge");
//Map<String, Tuple2<String, Integer>>
map.put(userID, Tuple2.of(userName,userAge));
ctx.collect(map);
Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
@Override
public void cancel()
flag = false;
@Override
public void close() throws Exception
if (conn != null) conn.close();
if (ps != null) ps.close();
if (rs != null) rs.close();
19.3.代码步骤
1.env
2.source
-1.构建实时数据事件流-自定义随机
<userID, eventTime, eventType, productID>
-2.构建配置流-从MySQL
<用户id,<姓名,年龄>>
3.transformation
-1.定义状态描述器
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
-2.广播配置流
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
-3.将事件流和广播流进行连接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
-4.处理连接后的流-根据配置流补全事件流中的用户的信息
4.sink
5.execute
19.4.代码实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* @author tuzuoquan
* @date 2022/6/14 0:15
*/
public class BroadcastStateDemo
public static void main(String[] args) throws Exception
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//TODO 2.source
//-1.构建实时数据事件流--数据量较大
//<userID, eventTime, eventType, productID>
DataStreamSource<Tuple4<String, String, String, Integer>> eventDS = env.addSource(new MySource());
//-2.配置流/规则流/用户信息流--数据量较小-从MySQL
//<用户id,<姓名,年龄>>
DataStreamSource<Map<String, Tuple2<String, Integer>>> userDS = env.addSource(new MySQLSource());
//TODO 3.transformation
//-1.定义状态描述器
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
new MapStateDescriptor<>("info", Types.VOID,
Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
//-2.广播配置流
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = userDS.broadcast(descriptor);
//-3.将事件流和广播流进行连接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>,
Map<String, Tuple2<String, Integer>>> connectDS = eventDS.connect(broadcastDS);
//-4.处理连接后的流-根据配置流补全事件流中的用户的信息
//BroadcastProcessFunction<IN1, IN2, OUT>
SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result =
connectDS.process(new BroadcastProcessFunction<
//<userID, eventTime, eventType, productID> //事件流
Tuple4<String, String, String, Integer>,
//<用户id,<姓名,年龄>> //广播流
Map<String, Tuple2<String, Integer>>,
//<用户id,eventTime,eventType,productID,姓名,年龄> //结果流 需要收集的数据
Tuple6<String, String, String, Integer, String, Integer>>()
//处理事件流中的每一个元素
@Override
public void processElement(Tuple4<String, String, String, Integer> value,
ReadOnlyContext ctx,
Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception
//value就是事件流中的数据
//<userID, eventTime, eventType, productID> //事件流--已经有了
//Tuple4<String, String, String, Integer>,
//目标是将value和广播流中的数据进行关联,返回结果流
//<用户id,<姓名,年龄>> //广播流--需要获取
//Map<String, Tuple2<String, Integer>>
//<用户id,eventTime,eventType,productID,姓名,年龄> //结果流 需要收集的数据
// Tuple6<String, String, String, Integer, String, Integer>
//获取广播流
ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>>
broadcastState = ctx.getBroadcastState(descriptor);
//用户id,<姓名,年龄>
Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);//广播流中的数据
if (map != null)
//根据value中的用户id去map中获取用户信息
String userId = value.f0;
Tuple2<String, Integer> tuple2 = map.get(userId);
String username = tuple2.f0;
Integer age = tuple2.f1;
//收集数据
out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, username, age));
//更新处理广播流中的数据
@Override
public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception
//value就是从MySQL中每隔5是查询出来并广播到状态中的最新数据!
//要把最新的数据放到state中
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
broadcastState.clear();//清空旧数据
broadcastState.put(null, value);//放入新数据
);
//TODO 4.sink
result.print();
//TODO 5.execute
env.execute();
/**
* 随机事件流--数据量较大
* 用户id,时间,类型,产品id
* <userID, eventTime, eventType, productID>
*/
public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>>
private boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning)
int id = random.nextInt(4) + 1;
String user_id = "user_" + id;
String eventTime = df.format(new Date());
String eventType = "type_" + random.nextInt(3);
int productId = random.nextInt(4);
ctx.collect(Tuple4.of(user_id, eventTime, eventType, productId));
Thread.sleep(500);
@Override
public void cancel()
isRunning = false以上是关于19.BroadcastState-动态更新规则配置需求数据代码步骤代码实现的主要内容,如果未能解决你的问题,请参考以下文章
SparkSparkStreaming-流处理-规则动态更新-解决方案