FlinkCEP - Flink的复杂事件处理
Posted coppher
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkCEP - Flink的复杂事件处理相关的知识,希望对你有一定的参考价值。
版本说明
本文中以Flink 1.16.1 版本讲解说明
Note:Flink1.16.1版本相较于之前版本增强的within函数,
支持模式序列中相邻事件间的超时定义,以前版本只支持模式序列中第一个事件到最后一个事件之间的最大时间间隔。
快速开始
基于Kafka connecter 流处理job还需要引入:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>$flink.version</version>
</dependency>
基于Flink DataStream作业添加CEP Maven依赖
<properties>
<flink.version>1.16.1</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>$flink.version</version>
</dependency>
使用Pattern API编写CEP例子
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where( //"start" 为模式的名称,模式名称规则查看模式名称约束条件
new SimpleCondition<Event>() // 定义了一个简单条件
@Override
public boolean filter(Event event)
return event.getId() == 42; //开始事件为id=42的事件,配到一个(未设置times时默认为匹配到1次,进入下一个模式)
).next("middle").subtype(SubEvent.class).where( // subtype方法限定middle对应事件子类为SubEvent
new SimpleCondition<SubEvent>()
@Override
public boolean filter(SubEvent subEvent)
return subEvent.getVolume() >= 10.0; // 事件 volume属性大于等于10.0
).followedBy("end").where(
new SimpleCondition<Event>()
@Override
public boolean filter(Event event)
return event.getName().equals("end"); // 最后一个事件
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>()
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception
// 对于匹配成功的事件存储在 Map<String, List<Event>> pattern的Map中,key为模式的名称,这里key值可能为start,middle,end,因为要支持循环模式的量词,所以模式对应的事件定义为List
// 对应匹配超时(使用了within条件)可以使用PatternTimeoutFunction来处理,下文中再讲解。
out.collect(createAlertFrom(pattern));
);
模式API
模式API可以实现在连续的事件中,匹配到符合指定特征的子序列。每个复杂的模式序列由多个简单的模式组合而成。
每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。
模式的名字不能包含字符":".
Flink CEP API中对模式划分为三种:单个模式,组合模式、模式组,下面对三种模式详细讲解
单个模式
单个模式,又分为单例模式、循环模式,单例模式只接受一个事件,循环模式可以接受多个事件。 在模式匹配表达式中,模式"a b+ c? d"(或者"a",后面跟着一个或者多个"b",再往后可选择的跟着一个"c",最后跟着一个"d"), a,c?,和 d都是单例模式,b+是一个循环模式。默认情况下,模式都是单例的,你可以通过使用量词(Quantifier)把它们转换成循环模式。 每个模式可以有一个或者多个条件来决定它接受哪些事件。
量词
在FlinkCEP中,你可以通过这些方法指定循环模式:pattern.oneOrMore(),指定期望一个给定事件出现一次或者多次的模式(例如前面提到的b+模式); pattern.times(#ofTimes),指定期望一个给定事件出现特定次数的模式,例如出现4次a; pattern.times(#fromTimes, #toTimes),指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次a。
你可以使用pattern.greedy()方法让循环模式变成贪心的,但现在还不能让模式组贪心。 你可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式。
例如:对一个名称为start的模式,可以指定如下量词;
// 期望出现4次
start.times(4);
// 期望出现0或者4次
start.times(4).optional();
// 期望出现2、3或者4次
start.times(2, 4);
// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();
// 期望出现0、2、3或者4次
start.times(2, 4).optional();
// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();
// 期望出现1到多次
start.oneOrMore();
// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();
// 期望出现0到多次
start.oneOrMore().optional();
// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();
// 期望出现2到多次
start.timesOrMore(2);
// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();
// 期望出现0、2或多次
start.timesOrMore(2).optional();
// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();
特别需要注意的是:
1. optional不能应用于 not 的pattern
2. optional不能应用于 GroupPattern
3. 前一个Pattern如果设置了greedy(),后一个Pattern不能使用optional
条件
条件即Condition,API已实现了一些基础的Condition:
条件用于判定事件是否被当前这个模式接收,可通过pattern.where()、pattern.or()或者pattern.until()方法指定条件,条件可使用已实现的Condition也可以集成IterativeCondition或者SimpleCondition自定义条件。
迭代条件(IterativeCondition): 这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。
下面是一个迭代条件的代码,它接受"middle"模式下一个事件的名称开头是"foo", 并且前面已经匹配到的事件加上这个事件的价格小于5.0。 迭代条件非常强大,尤其是跟循环模式结合使用时。
middle.oneOrMore()
.subtype(SubEvent.class)
.where(new IterativeCondition<SubEvent>()
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception
if (!value.getName().startsWith("foo"))
return false;
double sum = value.getPrice();
// ctx.getEventsForPattern还可以获取前面的pattern,比如 ctx.getEventsForPattern("start")
for (Event event : ctx.getEventsForPattern("middle"))
sum += event.getPrice();
return Double.compare(sum, 5.0) < 0;
);
调用ctx.getEventsForPattern(…)可以获得所有前面已经接受作为可能匹配的事件。 调用这个操作的代价可能很小也可能很大,所以在实现你的条件时,尽量少使用它。
简单条件:和它的名称一样,实现非常简单,只是判断单个事件本身的属性是否满足某些条件,类似list.stream.filter() 的功能。
组合条件:和SQL中的 and 、or一样, and条件使用多个where()方法连续调用,or,调用or()方法即可,示例如下
pattern.where(new SimpleCondition<Event>()
@Override
public boolean filter(Event value)
return ...; // 一些判断条件
).where(new SimpleCondition<Event>()
@Override
public boolean filter(Event value)
return ...; // AND的另一些判断条件
).or(new SimpleCondition<Event>()
@Override
public boolean filter(Event value)
return ...; // OR的另一些判断条件
);
停止条件: 如果使用循环模式(oneOrMore()和oneOrMore().optional()),你可以指定一个停止条件,例如,接受事件的值大于5直到值的和小于50。方法名:until(condition);
条件API使用实例汇总:
// where()、 or()使用
pattern.where(new IterativeCondition<Event>()
@Override
public boolean filter(Event value, Context ctx) throws Exception
return ...; // 一些判断条件
).or(new IterativeCondition<Event>()
@Override
public boolean filter(Event value, Context ctx) throws Exception
return ...; // 替代条件
);
// until 使用方法
pattern.oneOrMore().until(new IterativeCondition<Event>()
@Override
public boolean filter(Event value, Context ctx) throws Exception
return ...; // 替代条件
);
// subtype(clazz)
pattern.subtype(SubEvent.class);
//默认内部为松散连续, 推荐使用 until()或者 within()来清理状态。
pattern.oneOrMore();
//timesOrMore(n),匹配结果: >=n 次
pattern.timesOrMore(2);
//timesOrMore(n),匹配结果: =n 次
pattern.times(2);
//optional(), 匹配结果 >=0次
pattern.oneOrMore().optional();
// 尽可能多低匹配,n?次,建议结合within来清理状态,和within结合后,匹配结果:>=1;
pattern.oneOrMore().greedy();
组合模式
上文中已讲解了单个模式,下面对组合模式进行说明。
定义多个模式,按连续性语义设定,组合完整的模式序列。
先定义一个初始模式作为开头:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
接下来,你可以增加更多的模式到模式序列中并指定它们之间所需的连续条件。FlinkCEP支持事件之间如下形式的连续策略:
严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
松散连续: 忽略匹配的事件之间的不匹配的事件。
不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。
可以使用下面的方法来指定模式之间的连续策略:
next(),指定严格连续,
followedBy(),指定松散连续,
followedByAny(),指定不确定的松散连续。
或者
notNext(),如果不想后面直接连着一个特定事件
notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。
- 如果模式序列没有定义时间约束,则不能以 notFollowedBy() 结尾。
- 一个 NOT 模式前面不能是可选的模式。
使用方法入下所示
// 严格连续
Pattern<Event, ?> strict = start.next("middle").where(...);
// 松散连续
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// 不确定的松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// 严格连续的NOT模式
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// 松散连续的NOT模式
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。 举例来说,模式"a b",给定事件序列"a",“c”,“b1”,“b2”,会产生如下的结果:
"a"和"b"之间严格连续: (没有匹配),"a"之后的"c"导致"a"被丢弃。
“a"和"b"之间松散连续: a b1,松散连续会"跳过不匹配的事件直到匹配上的事件”。
"a"和"b"之间不确定的松散连续: a b1, a b2,这是最常见的情况。
也可以为模式定义一个有效时间约束。 例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生。 这种时间模式支持处理时间和事件时间.
一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。
注意定义过时间约束的模式允许以 notFollowedBy() 结尾。 例如,可以定义如下的模式:
Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>()
@Override
public boolean filter(Event value) throws Exception
return value.getName().equals("a");
).notFollowedBy("end").where(new SimpleCondition<Event>()
@Override
public boolean filter(Event value) throws Exception
return value.getName().equals("b");
).within(Time.seconds(10));
循环模式中的连续性
你可以在循环模式中使用和前面章节讲过的同样的连续性。 连续性会被运用在被接受进入模式的事件之间。 用这个例子来说明上面所说的连续性,一个模式序列"a b+ c"(“a"后面跟着一个或者多个(不确定连续的)“b”,然后跟着一个"c”) 输入为"a",“b1”,“d1”,“b2”,“d2”,“b3”,“c”,输出结果如下:
严格连续: a b1 c, a b2 c, a b3 c - 没有相邻的 “b” 。
松散连续: a b1 c,a b1 b2 c,a b1 b2 b3 c,a b2 c,a b2 b3 c,a b3 c - "d"都被忽略了。
不确定松散连续: a b1 c,a b1 b2 c,a b1 b3 c,a b1 b2 b3 c,a b2 c,a b2 b3 c,a b3 c - 注意a b1 b3 c,这是因为"b"之间是不确定松散连续产生的。
对于循环模式(例如oneOrMore()和times())),默认是松散连续。如果想使用严格连续,你需要使用consecutive()方法明确指定, 如果想使用不确定松散连续,你可以使用allowCombinations()方法。
// 循环模式,模式内部指定为间隔连续
pattern.oneOrMore().consecutive()
pattern.times().consecutive()
// 循环模式,模式内部指定为不确定松散连续
pattern.oneOrMore().allowCombinations()
pattern.times().allowCombinations()
模式组
也可以定义一个模式序列作为begin,followedBy,followedByAny和next的条件。这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个GroupPattern,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。
模式组:GroupPattern,
模式组使用示例:
GroupPattern<Event, ?> start = GroupPattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// 严格连续
GroupPattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// 松散连续
GroupPattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// 不确定松散连续
GroupPattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
需要注意的是模式组不支持一下操作:
GroupPattern 不支持 where()
GroupPattern 不支持 or()
GroupPattern 不支持 subtype()
匹配后跳过策略
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:
NO_SKIP: 每个成功的匹配都会被输出。
SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部
以上是关于FlinkCEP - Flink的复杂事件处理的主要内容,如果未能解决你的问题,请参考以下文章