“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是啥意思?

Posted

技术标签:

【中文标题】“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是啥意思?【英文标题】:What does it mean that "broadcast state" unblocks the implementation of the “dynamic patterns” feature for Flink’s CEP library?“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是什么意思? 【发布时间】:2018-05-26 11:31:57 【问题描述】:

从 Flink 1.5 发布公告中,我们知道 Flink 现在支持“广播状态”,并且描述了“广播状态为 Flink 的 CEP 库的“动态模式”功能的实现解除阻塞。

这是否意味着目前我们可以在没有 Flink CEP 的情况下使用“广播状态”来实现“动态模式”? 另外我不知道在有或没有广播状态的情况下为 Flink CEP 实现“动态模式”时有什么区别?如果有人可以用代码举例说明差异,我将不胜感激。

=============

操作员 broadcast() 使用 keyed-datastream 更新测试广播数据流

在 Flink 1.4.2 测试后,我发现广播数据流(通过旧的 operatorbroadcast())可以连接到 keyed datastream,下面是测试代码,我们发现所有的控制流事件都广播到所有 operator 实例. 所以看起来旧的 broadcast() 可以实现与新的“广播状态”相同的功能。

public static void ConnectBroadToKeyedStream() throws Exception 
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);

    List<Tuple1<String>>
            controlData = new ArrayList<Tuple1<String>>();
    controlData.add(new Tuple1<String>("DROP"));
    controlData.add(new Tuple1<String>("IGNORE"));
    DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);

    List<Tuple1<String>>
            dataStreamData = new ArrayList<Tuple1<String>>();
    dataStreamData.add(new Tuple1<String>("data"));
    dataStreamData.add(new Tuple1<String>("DROP"));
    dataStreamData.add(new Tuple1<String>("artisans"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));

    // DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
    DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);

    DataStream<String> result = control
            .broadcast()
            .connect(keyedDataStream)
            .flatMap(new MyCoFlatMap());
    result.print();
    env.execute();


private static final class MyCoFlatMap
        implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> 
    HashSet blacklist = new HashSet();

    @Override
    public void flatMap1(Tuple1<String> control_value, Collector<String> out) 
        blacklist.add(control_value);
        out.collect("listed " + control_value);
    

    @Override
    public void flatMap2(Tuple1<String> data_value, Collector<String> out) 

        if (blacklist.contains(data_value)) 
            out.collect("skipped " + data_value);
         else 
            out.collect("passed " + data_value);
        
    

下面是测试结果。

1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)

https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement

【问题讨论】:

【参考方案1】:

没有广播状态,两个 Flink 数据流不能以有状态的方式一起处理,除非它们以完全相同的方式键控。广播流可以连接到键控流,但是如果您尝试在 RichCoFlatMap 中使用键控状态,例如,这将失败。

经常需要的是能够让一个流具有动态“规则”,这些规则将应用于另一个流上的每个事件,而不管密钥如何。需要一种新的托管 Flink 状态来存储这些规则。使用broadcast state,现在可以以简单的方式完成。

现在有了这个功能,就可以开始在 CEP 中支持动态模式了。

【讨论】:

dataStream.broadcast() 操作者是否无法连接到键控 DataStream ?如果可以,使用广播状态和广播()操作符有什么区别?看来两者的效果是一样的。 所以更准确地说,1)在 Flink1.5 之前,广播流不能是键控流,但是从 Flink1.5 开始,广播流可以通过广播状态成为键控流,2)甚至在 Flink 1.5 之前,广播流仍然可以连接到非键控流,3)键控流无法连接到另一个非键控流,4)但是键控流可以连接到另一个键控流。如果 4 项中有任何一个错误,请纠正我. 我做了测试,发现我可以成功connect()一个广播流和一个keyed流,并将测试代码更新为原始问题,请检查。 是的,我弄错了。可以连接广播流和键控流。不能做的是在 RichCoFlatMap 之类的东西中使用键控状态来存储动态规则。 您能帮忙检查一下这个相关问题***.com/questions/50570605/…吗?【参考方案2】:

这是一个代码示例,它实现了 flink 原始的无参数广播方法和 flink 1.5.0 上新引入的广播状态。 https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed

据我所知,广播状态可以在没有 flink cep 的情况下实现,就像上面显示的代码一样。

原始DataStreambroadcast 方法将创建DataStream 而不是BroadcastConnectedStream。这将是最初的 coGroup 设计方案。在将度量流与广播规则流连接后,我们可以使用ConnectedStreams 中定义的更多流转换函数。比如keyBy函数,这会使具有相同键的广播流和连接流被processed并粘在同一个并行CoProcessFunction上。所以CoProcessFunction 可以有自己的本地存储。除了从ReadOnlyContext 访问的地图状态之外,流程函数可以在其字段上具有自定义数据结构。

广播状态可以通过broadcast方法和一组MapStateDescriptor来实现,这意味着广播的流可以与其他流多次连接。不同连接的BroadcastConnectedStream 可以通过process 函数中唯一的MapStateDescriptor 共享自己的广播状态。

我认为这些将是带有 on 参数的广播和广播状态之间的主要区别。

【讨论】:

以上是关于“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是啥意思?的主要内容,如果未能解决你的问题,请参考以下文章

Flink CEP:哪种方法可以为不同类型的事件加入数据流?

Flink 源码:广播流状态源码解析

Flink Broadcast State 实战指南

Flink Broadcast State 实战指南

Flink-状态与容错-Broadcast State--flink1.13

大数据——Flink Broadcast State 广播状态