如何在 flink 中更新 KeyedBroadcastProcessFunction 中的广播状态?
Posted
技术标签:
【中文标题】如何在 flink 中更新 KeyedBroadcastProcessFunction 中的广播状态?【英文标题】:How to update the Broadcast state in KeyedBroadcastProcessFunction in flink? 【发布时间】:2020-06-08 13:15:40 【问题描述】:我是 Flink 的新手,我正在使用 apache flink 进行模式匹配,其中模式列表以广播状态存在,并遍历 processElements 函数中的模式以找到匹配的模式,我正在从数据库中读取这些模式和这是一项准时的活动。下面是我的代码
MapState Descriptor 和 Side 输出流如下
public static final MapStateDescriptor<String, String> ruleDescriptor=
new MapStateDescriptor<String, String>("RuleSet", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
public final static OutputTag<Tuple2<String, String>> unMatchedSideOutput =
new OutputTag<Tuple2<String, String>>(
"unmatched-side-output")
;
处理函数和广播函数如下:
@Override
public void processElement(Tuple2<String, String> inputValue, ReadOnlyContext ctx,Collector<Tuple2<String,String>> out) throws Exception
for (Map.Entry<String, String> ruleSet: ctx.getBroadcastState(broadcast.patternRuleDescriptor).immutableEntries())
String ruleName = ruleSet.getKey();
//If the rule in ruleset is matched then send output to main stream and break the program
if (this.rule)
out.collect(new Tuple2<>(inputValue.f0, inputValue.f1));
break;
// Writing output to sideout if no rule is matched
ctx.output(Output.unMatchedSideOutput, new Tuple2<>("No Rule Detected", inputValue.f1));
@Override
public void processBroadcastElement(Tuple2<String, String> ruleSetConditions, Context ctx, Collector<Tuple2<String,String>> out) throws Exception ctx.getBroadcastState(broadcast.ruleDescriptor).put(ruleSetConditions.f0,
ruleSetConditions.f1);
主要功能如下
public static void main(String[] args) throws Exception
//Initiate a datastream environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Reads incoming data for upstream
DataStream<String> incomingSignal =
env.readTextFile(....);
//Reads the patterns available in configuration file
DataStream<String> ruleStream =
env.readTextFile();
//Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
DataStream<Tuple2<String, String>> ruleStream =
rawPatternStream.flatMap(new FlatMapFunction<String, Tuple2<String, String>>()
@Override
public void flatMap(String ruleCondition, Collector<Tuple2<String, String>> out) throws Exception
String rules[] = ruleCondition.split[","];
out.collect(new Tuple2<>(rules[0], rules[1]));
);
//Broadcast the patterns to all the flink operators which will be stored in flink operator memory
BroadcastStream<Tuple2<String, String>>ruleBroadcast = ruleStream.broadcast(ruleDescriptor);
/*Creating keystream based on sourceName as key */
DataStream<Tuple2<String, String>> matchSignal =
incomingSignal.map(new MapFunction<String, Tuple2<String, String>>()
@Override
public Tuple2<String, String> map(String incomingSignal) throws Exception
String sourceName = ingressSignal.split[","][0]
return new Tuple2<>(sourceName, incomingSignal);
).keyBy(0).connect(ruleBroadcast).process(new KeyedBroadCastProcessFunction());
matchSignal.print("RuleDetected=>");
我有几个问题
1) 目前我正在从数据库中读取规则,当 flink 作业在集群中运行时如何更新广播状态,如果我从 kafka 主题获取新规则集,我如何在 processBroadcast 方法中更新广播状态在 KeyedBroadcasrProcessFunction 2)当broadcast state更新时,是否需要重启flink job?
请帮我解决以上问题
【问题讨论】:
嘿,请添加一些代码示例,以便人们更好地理解问题。 @DominikWosiński 我用我的代码更新了这个问题,如果规则有任何变化,你能建议我如何更新广播状态 【参考方案1】:设置或更新广播状态的唯一方法是使用BroadcastProcessFunction
或KeyedBroadcastProcessFunction
的processBroadcastElement
方法。您需要做的就是调整您的应用程序以从流式源中流式传输规则,而不是从文件中读取一次。
广播状态是一个哈希映射。如果您的广播流包含一个新的键/值对,它使用与早期广播事件相同的键,那么新值将替换旧值。否则你会得到一个全新的条目。
如果您将readFile 与FileProcessingMode.PROCESS_CONTINUOUSLY
一起使用,则每次修改文件时,都会重新摄取其全部内容。您可以使用该机制来更新您的规则集。
【讨论】:
我更新了我的问题,实际上我有一个新要求,我将在数据库中有一组规则,这些规则将被广播,并且将来在这种情况下,新规则将通过 kafka 主题发送我怎么能用新的模式更新我的广播状态中的模式我知道你在上面提到我需要在 processBroadCast 方法中进行,但我不知道该怎么做我是 flink 新手,你能指出任何示例或只是一个概述我该怎么做才对我很有帮助 这里有一个示例:github.com/ververica/flink-training-exercises/blob/master/src/… 这是文档中示例的一个小变体 (ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/…)。以上是关于如何在 flink 中更新 KeyedBroadcastProcessFunction 中的广播状态?的主要内容,如果未能解决你的问题,请参考以下文章
在流式系统中如何引入Watermark支持:以Pravega和Flink为例
State Processor API:如何读写和修改 Flink 应用程序的状态
State Processor API:如何读写和修改 Flink 应用程序的状态
从0到1Flink的成长之路-Flink Action 综合案例-BroadcastState 动态更新