从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新
Posted 熊老二-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新相关的知识,希望对你有一定的参考价值。
BroadcastState 动态更新
需求
实时过滤出配置中的用户,并在事件流中补全这批用户的基础信息。
事件流:
表示用户在某个时刻浏览或点击了某个商品,格式如下。
{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
{"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}
配置数据:
表示用户的详细信息,在mysql中,如下。
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (
`userID` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`userName` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`userAge` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`userID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `user_info` VALUES ('user_1', '张三', 10);
INSERT INTO `user_info` VALUES ('user_2', '李四', 20);
INSERT INTO `user_info` VALUES ('user_3', '王五', 30);
INSERT INTO `user_info` VALUES ('user_4', '赵六', 40);
输出结果:
(user_3,2019-08-17 12:19:47,browse,1,王五,33)
(user_2,2019-08-17 12:19:48,click,1,李四,20)
BroadcastState 概述
在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就
可以使用 BroadcastState。Broadcast State 是 Flink 1.5 引入的新特性。
下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据
流的计算中 。
场景举例
1) 、动态更新计算规则: 如事件流需要根据最新的规则进行计算,则可将规则作为广播状态广
播到下游Task中。
2) 、实时增加额外字段: 如事件流需要实时增加用户的基础信息,则可将用户的基础信息作为
广播状态广播到下游Task中。
API 使用
首先创建一个Keyed 或Non-Keyed 的DataStream,然后再创建一个BroadcastedStream,
最后通过DataStream来连接(调用connect 方法)到Broadcasted Stream 上,这样实现将
BroadcastState广播到Data Stream 下游的每个Task中。
- 如果DataStream是Keyed Stream ,则连接到Broadcasted Stream 后, 添加处理
ProcessFunction 时需要使用KeyedBroadcastProcessFunction 来实现, 下面是
KeyedBroadcastProcessFunction 的API,代码如下所示:
上面泛型中的各个参数的含义,说明如下:
KS:表示Flink 程序从最上游的Source Operator 开始构建Stream,当调用keyBy 时所依赖的
Key 的类型;
IN1:表示非Broadcast 的Data Stream 中的数据记录的类型;
IN2:表示Broadcast Stream 中的数据记录的类型;
OUT:表示经过KeyedBroadcastProcessFunction 的processElement()和
processBroadcastElement()方法处理后输出结果数据记录的类型。
- 如果Data Stream 是Non-Keyed Stream,则连接到Broadcasted Stream 后,添加处理
ProcessFunction 时需要使用BroadcastProcessFunction 来实现, 下面是
BroadcastProcessFunction 的API,代码如下所示:
上面泛型中的各个参数的含义,与前面KeyedBroadcastProcessFunction 的泛型类型中的后3 个
含义相同,只是没有调用keyBy 操作对原始Stream 进行分区操作,就不需要KS 泛型参数。
具体如何使用上面的BroadcastProcessFunction,接下来我们会在通过实际编程,来以使用
KeyedBroadcastProcessFunction 为例进行详细说明。
注意事项
- BroadcastState 是Map 类型,即K-V 类型。
- Broadcast State 只有在广播的一侧, 即在BroadcastProcessFunction 或
KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非广播
的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的
processElement 方法中只读。 - Broadcast State 中元素的顺序,在各Task 中可能不同,基于顺序的处理,需要注意。
- Broadcast State 在Checkpoint 时,每个Task 都会Checkpoint 广播状态。
- Broadcast State 在运行时保存在内存中,目前还不能保存在RocksDB State Backend 中。
代码实现
自定义数据源产生用户访问网站点击流日志数据,依据用户ID(userId)与用户信息关联
(从MySQL数据库加载数据,动态加载数据),输出结果数据。
自定义数据源产生用户点击流日志
package xx.xxxxx.flink.broadcast;
import lombok.*;
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class TrackLog {
private String userId ;
private Integer productId ;
private String trackTime ;
private String eventType ;
@Override
public String toString() {
return userId + ", " + productId + ", " + trackTime + ", " + eventType;
}
}
package xx.xxxxxx.flink.broadcast;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class TrackLogSource extends RichParallelSourceFunction<TrackLog> {
private boolean isRunning = true ;
@Override
public void run(SourceContext<TrackLog> ctx) throws Exception {
String[] types = new String[]{
"click", "browser", "search", "click", "browser", "browser", "browser",
"click", "search", "click", "browser", "click", "browser", "browser", "browser"
} ;
Random random = new Random() ;
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS") ;
while (isRunning){
TrackLog clickLog = new TrackLog(
"uid_" + (random.nextInt(4) + 1), //
10000 + random.nextInt(10000), //
format.format(System.currentTimeMillis()), //
types[random.nextInt(types.length)]
);
ctx.collect(clickLog);
TimeUnit.MILLISECONDS.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false ;
}
}
自定义数据源,加载MySQL数据用户数据
package xx.xxxxxx.flink.broadcast;
import lombok.*;
@Setter
@Getter
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class UserInfo {
private String userId ;
private String userName ;
private Integer userAge ;
@Override
public String toString() {return userId + ", " + userName + ", " + userAge ;}
}
package xx.xxxxxx.flink.broadcast;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;
/**
* 自定义数据源,实时从MySQL表获取数据,实现接口RichParallelSourceFunction
*/
public class UserInfoSource extends RichSourceFunction<UserInfo> {
// 标识符,是否实时接收数据
private boolean isRunning = true;
private Connection conn = null;
private PreparedStatement pstmt = null;
private ResultSet rs = null;
@Override
public void open(Configuration parameters) throws Exception {
// 1. 加载驱动
Class.forName("com.mysql.jdbc.Driver");
// 2. 创建连接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?useUnicode=true&characterEncoding=utf-8",
"root",
"123456"
);
// 3. 创建PreparedStatement
pstmt = conn.prepareStatement("select userId, userName, userAge from db_flink.user_info");
}
@Override
public void run(SourceContext<UserInfo> ctx) throws Exception {
while (isRunning) {
// 1. 执行查询
rs = pstmt.executeQuery();
// 2. 遍历查询结果,收集数据
while (rs.next()) {
String id = rs.getString("userId");
String name = rs.getString("userName");
Integer age = rs.getInt("userAge");
UserInfo userInfo = new UserInfo(id, name, age);
// 输出
ctx.collect(userInfo);
}
// 每隔3秒查询一次
TimeUnit.SECONDS.sleep(3);
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void close() throws Exception {
if (null != rs) rs.close();
if (null != pstmt) pstmt.close();
if (null != conn) conn.close();
}
}
使用BroadcastState进行数据流的连接关联操作
package xx.xxxxxx.flink.broadcast;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
/**
* 实时过滤出配置中的用户,并在事件流中补全这批用户的基础信息
* TODO: 用户信息存储在MySQL数据库表
*/
public class DynamicConfigUpdate {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 2. 数据源-source
// 2-1. 构建实时数据事件流: 用户行为日志,<userId, productId, trackTime, eventType>
DataStreamSource<TrackLog> logDataStream = env.addSource(new TrackLogSource());
//logDataStream.printToErr();
// 2-2. 构建配置流: 用户信息,<userId, name, age>
DataStreamSource<UserInfo> userDataStream = env.addSource(new UserInfoSource());
//userDataStream.printToErr();
// 3. 数据转换-transformation
// 3-1. 定义状态State描述符
MapStateDescriptor<String, UserInfo> decriptor = new MapStateDescriptor<>(
"userInfoState", Types.STRING, TypeInformation.of(new TypeHint<UserInfo>() {}) //
);
// 3-2. 广播流
BroadcastStream<UserInfo> broadcastDataStream = userDataStream.broadcast(decriptor);
// 3-3. 将事件流和广播流进行连接
SingleOutputStreamOperator<String> outputDataStream = logDataStream
.connect(broadcastDataStream)
.process(new BroadcastProcessFunction<TrackLog, UserInfo, String>() {
// 此处广播状态是不可变的,用于关联事件流TrackLog数据
@Override
public void processElement(TrackLog value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
// 获取状态
ReadOnlyBroadcastState<String, UserInfo> broadcastState = ctx.getBroadcastState(decriptor);
// 获取用户ID
String userId = value.getUserId();
// 依据ID获取值
UserInfo userInfo = broadcastState.get(userId) ;
if(null != userInfo){
// 拼接字符串
String output = value.toString() + " <-> " + userInfo.toString() ;
out.collect(output);
}
}
// 更新广播流
@Override
public void processBroadcastElement(UserInfo value,
Context ctx,
Collector<String> out) throws Exception {
// a. 获取状态
BroadcastState<String, UserInfo> broadcastState = ctx.getBroadcastState(decriptor);
// b. 加入数据
broadcastState.put(value.getUserId(), value);
}
});
// 4. 数据终端-sink
outputDataStream.printToErr();
// 5. 执行应用
env.execute(DynamicConfigUpdate.class.getSimpleName());
}
}
以上是关于从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新的主要内容,如果未能解决你的问题,请参考以下文章