Flink CEP 处理匹配事件

Posted Alienware^

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink CEP 处理匹配事件相关的知识,希望对你有一定的参考价值。

文章目录

将模式应用到流上

将模式应用到事件流上的代码非常简单,只要调用 CEP 类的静态方法.pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:

DataStream<Event> inputStream = ...
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);

这里的 DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了。

模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;如果是处理时间语义,那么所谓先后就是数据到达的顺序。对于时间戳相同或是同时到达的事件,还可以在 CEP.pattern()中传入一个比较器作为第三个参数,用来进行更精确的排序:

// 可选的事件比较器
EventComparator<Event> comparator = ... 
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。

处理匹配事件

PatternStream 的转换操作主要可以分成两种:简单便捷的选择提取(select)操作,和更加通用、更加强大的处理(process)操作。与 DataStream 的转换类似,具体实现也是在调用API 时传入一个函数类:选择操作传入的是一个 PatternSelectFunction,处理操作传入的则是一个 PatternProcessFunction。

匹配事件的选择提取(select)

处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

1) PatternSelectFunction
代码中基于 PatternStream 直接调用.select()方法,传入一个PatternSelectFunction 作为参数。

PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
DataStream<String> result = patternStream.select(new MyPatternSelectFunction());

这 里 的 MyPatternSelectFunction 是 PatternSelectFunction 的 一 个 具 体 实 现 。PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的 value 就是一个事件的列表(List)。

下面是 MyPatternSelectFunction 的一个具体实现:

class MyPatternSelectFunction implements PatternSelectFunction <Event, String> 
    @Override
    public String select(Map <String, List <Event>> pattern) throws Exception 
        Event startEvent = pattern.get("start").get(0);
        Event middleEvent = pattern.get("middle").get(0);
        return startEvent.toString() + " " + middleEvent.toString();
    

PatternSelectFunction 里需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它以保存了匹配复杂事件的 Map 作为输入,经自定义转换后得到输出信息返回。这里我们假设之前定义的模式序列中,有名为“start”和“middle”的两个个体模式,于是可以通过这个名称从 Map 中选择提取出对应的事件。注意调用 Map 的.get(key)方法后得到的是一个事件的 List;如果个体模式是单例的,那么 List 中只有一个元素,直接调用.get(0)就可以把它取出。

当然,如果个体模式是循环的,List 中就有可能有多个元素了。例如我们在快速上手案例中对连续登录失败检测的改进,我们可以将匹配到的事件包装成 String 类型的报警信息输出,代码如下:

// 1. 定义 Pattern,登录失败事件,循环检测 3 次
Pattern <LoginEvent, LoginEvent> pattern = Pattern. <LoginEvent> begin("fails").where(new SimpleCondition <LoginEvent> () 
    @Override
    public boolean filter(LoginEvent loginEvent) throws Exception 
        return loginEvent.eventType.equals("fail");
    
).times(3).consecutive();
// 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
PatternStream <LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.select(new PatternSelectFunction <LoginEvent, String> () 
    @Override
    public String select(Map <String, List <LoginEvent>> map) throws Exception 
        // 只有一个模式,匹配到了 3 个事件,放在 List 中
        LoginEvent first = map.get("fails").get(0);
        LoginEvent second = map.get("fails").get(1);
        LoginEvent third = map.get("fails").get(2);
        return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
    
).print("warning");

运行程序进行测试,会发现结果与之前完全一样。

2)PatternFlatSelectFunction
除此之外,PatternStream 还有一个类似的方法是.flatSelect(),传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是 PatternSelectFunction 的“扁平化”版本;内部需要实现一个 flatSelect()方法,它与之前 select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数 out,通过调用out.collet()方法就可以实现多次发送输出数据了。

例如上面的代码可以写成:

// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.flatSelect(new PatternFlatSelectFunction <LoginEvent, String> () 
    @Override
    public void flatSelect(Map <String, List <LoginEvent>> map, Collector <String> out) throws Exception 
        LoginEvent first = map.get("fails").get(0);
        LoginEvent second = map.get("fails").get(1);
        LoginEvent third = map.get("fails").get(2);
        out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp);
    
).print("warning");

可见 PatternFlatSelectFunction 使用更加灵活,完全能够覆盖PatternSelectFunction 的功能。这跟 FlatMapFunction 与 MapFunction 的区别是一样的。

匹配事件的通用处理(process)

自 1.8 版本之后,Flink CEP 引入了对于匹配事件的通用检测处理方式,那就是直接调用PatternStream 的.process()方法,传入一个 PatternProcessFunction。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。

所以 PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,PatternSelectFunction 和 PatternFlatSelectFunction在 CEP 内部执行时也会被转换成 PatternProcessFunction。我们可以使用 PatternProcessFunction 将之前的代码重写如下:

// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.process(new PatternProcessFunction < LoginEvent, String > () 
    @Override
    public void processMatch(Map <String, List <LoginEvent>> map, Context ctx, Collector < String > out) throws Exception 
        LoginEvent first = map.get("fails").get(0);
        LoginEvent second = map.get("fails").get(1);
        LoginEvent third = map.get("fails").get(2);
        out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp);
    
).print("warning");

PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time);还可以调用.output()方法将数据输出到侧输出流。侧输出流的功能是处理函数的一大特性,我们已经非常熟悉;而在 CEP 中,侧输出流一般被用来处理超时事件。

Gitee上源代码

以上是关于Flink CEP 处理匹配事件的主要内容,如果未能解决你的问题,请参考以下文章

Flink-CEP之NFA

大数据计算引擎之Flink Flink CEP复杂事件编程

Flink 复杂事件处理

Flink CEP - Flink的复杂事件处理

大数据开发-Flink-CEP的主要原理和使用

大数据开发-Flink-CEP的主要原理和使用