Flink 复杂事物处理
Posted ronnieyuan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 复杂事物处理相关的知识,希望对你有一定的参考价值。
简介
- FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。
- 它允许你在无界的事件流中检测事件模式,让你有机会掌握数据中重要的事项。
- Flink CEP 首先需要用户创建定义一个个pattern,然后通过链表将由前后逻辑关系的pattern串在一起,构成模式匹配的逻辑表达。
- 可以应用的场景: 直播平台异常检测(扫X), 顺风车路径异常检测(XD) 等等.....
Maven坐标
- 我下的是1.9.1
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.9.1</version>
</dependency>
- 需要注意的是: 应用模式匹配的DataStream中的事件必须实现正确的equals()和hashCode()方法,因为FlinkCEP使用它们来比较和匹配事件。
入门 demo
场景: 用户登录, 在整个模式匹配的规则在5秒内,如果连续两次登录失败,则发出警告。。
代码:
LoginEvent
package com.ronnie.flink.demo.cep; public class LoginEvent { private String userId;//用户ID private String ip;//登录IP private String type;//登录类型 public LoginEvent() { } public LoginEvent(String userId, String ip, String type) { this.userId = userId; this.ip = ip; this.type = type; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getType() { return type; } public void setType(String type) { this.type = type; } @Override public String toString() { return "LoginEvent{" + "userId='" + userId + ''' + ", ip='" + ip + ''' + ", type='" + type + ''' + '}'; } }
LoginWarning
package com.ronnie.flink.demo.cep; public class LoginWarning { private String userId; private String type; private String ip; public LoginWarning() { } public LoginWarning(String userId, String type, String ip) { this.userId = userId; this.type = type; this.ip = ip; } @Override public String toString() { return "LoginWarning{" + "userId='" + userId + ''' + ", type='" + type + ''' + ", ip='" + ip + ''' + '}'; } }
LoginWarningDemo
package com.ronnie.flink.demo.cep; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.Arrays; import java.util.List; import java.util.Map; /** * 整个模式匹配的规则在5秒内,如果连续两次登录失败,则发出警告。。 */ public class LoginFailWarningDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource loginEventStream = env.fromCollection(Arrays.asList( new LoginEvent("1", "192.168.0.1", "fail"), new LoginEvent("1", "192.168.0.2", "fail"), new LoginEvent("1", "192.168.0.3", "fail"), new LoginEvent("2", "192.168.10,10", "fail"), new LoginEvent("2", "192.168.10,10", "success") )); // 开启一个模式匹配规则 Pattern<LoginEvent, LoginEvent> begin = Pattern.begin("begin"); // 模式匹配的条件 Pattern<LoginEvent, LoginEvent> p1 = begin.where(new IterativeCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception { // 过滤掉非fail的, 只返回fail的 return loginEvent.getType().equals("fail"); } }); // 追加一个新的模式。 匹配事件必须直接跟着先前的匹配事件(严格连续性) Pattern<LoginEvent, LoginEvent> next = p1.next("next"); // 新模式的匹配条件 Pattern<LoginEvent, LoginEvent> p2 = next.where(new IterativeCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception { // 第二次匹配 return loginEvent.getType().equals("fail"); } }); // 定义事件序列进行模式匹配的最大间隔。如果未完成的事件序列超过此时间, 则将其丢弃: Pattern<LoginEvent, LoginEvent> p3 = p2.within(Time.seconds(5)); // 注意导的是窗口的时间 PatternStream patternStream = CEP.pattern(loginEventStream.keyBy("userId"), p3); patternStream.select(new PatternSelectFunction<LoginEvent, LoginWarning>() { @Override public LoginWarning select(Map<String, List<LoginEvent>> pattern) throws Exception { List<LoginEvent> begin01 = pattern.get("begin"); System.out.println("------- Begin List ------"); begin01.stream().forEach(System.out::println); System.out.println("------- next list ------"); List<LoginEvent> next01 = pattern.get("next"); next01.stream().forEach(System.out::println); return new LoginWarning(next01.get(0).getUserId(), next01.get(0).getType(), next01.get(0).getIp()); } }); loginEventStream.printToErr(); env.execute(); } }
Pattern API
- Pattern API允许你定义要从输入流中提取的复杂模式序列。
- 每个复杂模式序列都是由多个简单模式组成,即寻找具有相同属性的单个事件的模式。
- 可以将模式序列视为此类模式的结构图,基于用户指定的条件从一个模式转换到下一个模式,例如, event.getName().equals("start")。
- 匹配是一系列输入事件,通过一系列有效的模式转换访问复杂模式图中的所有模式。
- 注意点:
- 每个模式必须具有唯一的名称, 以便后续可以使用该名称来标识匹配的事件。
- 注意模式名称不能包含字符“:”
单个模式
Pattern可以是单单个,也可以是循环模式。
单个模式接受单个事件,而循环模式可以接受多个事件。
在模式匹配符号中,模式“a b + c?d”(或“a”,后跟一个或多个“b”,可选地后跟“c”,后跟“d”),a,c ?,和d是单例模式,而b +是循环模式。 [吐槽一下: 搞得跟正则似的]
默认情况下,模式是单个模式,可以使用Quantifiers将其转换为循环模式。
每个模式可以有一个或多个条件,基于它接受事件。
1.1. Quantifiers
在FlinkCEP中,您可以使用以下方法指定循环模式:pattern.oneOrMore(),用于期望一个或多个事件发生的模式(例如之前提到的b +);
pattern.times(#ofTimes), 用于期望给定类型事件的特定出现次数的模式
patterntimes(#fromTimes,#toTimes),用于期望给定类型事件的最小出现次数和最大出现次数的模式
可以使用pattern.greedy()方法使循环模式变得贪婪,但是还不能使组模式变得贪婪。
可以使用pattern.optional()方法使得所有模式,循环与否,变为可选。
```java // expecting 4 occurrences 出现4次 start.times(4); // expecting 0 or 4 occurrences 出现 0 或 4 次 start.times(4).optional(); // expecting 2, 3 or 4 occurrences 出现 2, 3, 4 次 start.times(2, 4); // expecting 2, 3 or 4 occurrences and repeating as many as possible // 出现 2, 3, 4次 并重复尽可能多次 start.times(2, 4).greedy(); // expecting 0, 2, 3 or 4 occurrences 出现0, 2, 3, 4次 start.times(2, 4).optional(); // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible // start.times(2, 4).optional().greedy(); // expecting 1 or more occurrences 出现一次或多次 start.oneOrMore(); // expecting 1 or more occurrences and repeating as many as possible // 出现一次或多次 并 重复多次 start.oneOrMore().greedy(); // expecting 0 or more occurrences 出现0或多次 start.oneOrMore().optional(); // expecting 0 or more occurrences and repeating as many as possible // 出现 0 或 多次并重复尽可能多次 start.oneOrMore().optional().greedy(); // expecting 2 or more occurrences 出现两次或多次 start.timesOrMore(2); // expecting 2 or more occurrences and repeating as many as possible // 出现两次或多次并尽可能重复多次 start.timesOrMore(2).greedy(); // expecting 0, 2 or more occurrences and repeating as many as possible //出现0, 2或多次并重复尽可能多次 start.timesOrMore(2).optional().greedy(); ```
1.2. Conditions
在每个模式中,从一个模式转到下一个模式,可以指定其他条件。您可以将使用下面这些条件:
传入事件的属性,例如其值应大于5,或大于先前接受的事件的平均值。
匹配事件的连续性,例如检测模式a,b,c,序列中间不能有任何非匹配事件。
1.3. Conditions on Properties
- 可以通过pattern.where(),pattern.or()或pattern.until()方法指定事件属性的条件。 条件可以是IterativeConditions或SimpleConditions。
- 迭代条件:
- 可以指定一个条件,该条件基于先前接受的事件的属性或其子集的统计信息来接受后续事件。
- 简单条件:
- 这种类型的条件扩展了前面提到的IterativeCondition类,并且仅根据事件本身的属性决定是否接受事件。
- 还可以通过pattern.subtype(subClass)方法将接受事件的类型限制为初始事件类型的子类型。
- 组合条件:
- 可以将子类型条件与其他条件组合使用。 这适用于所有条件。 您可以通过顺序调用where()来任意组合条件。
- 最终结果将是各个条件的结果的逻辑AND。 要使用OR组合条件,可以使用or()方法
- 停止条件:
- 在循环模式(oneOrMore()和oneOrMore().optional())的情况下,还可以指定停止条件,例如: 接受值大于5的事件,直到值的总和小于50。
- 连续事件条件:
- FlinkCEP支持事件之间以下形式进行连续:
- 严格连续性:希望所有匹配事件一个接一个地出现,中间没有任何不匹配的事件。
- 宽松连续性:忽略匹配的事件之间出现的不匹配事件。 不能忽略两个事件之间的匹配事件。
- 非确定性轻松连续性:进一步放宽连续性,允许忽略某些匹配事件的其他匹配。
- 对于循环模式(例如oneOrMore()和times()),默认是宽松的连续性。
- 想要严格的连续性,必须使用consecutive()显式指定它。
- 如果你想要非确定性的松弛连续性,你可以使用allowCombinations()方法。
- 单个循环模式中的连续性,并且需要在该上下文中理解consecutive()和allowCombinations()。
1.4 API 简介
- where(condition)
- 定义当前模式的条件。 为了匹配模式,事件必须满足条件。 多个连续的where(),其条件为AND:
or(condition)
- 添加与现有条件进行OR运算的新条件。 只有在至少通过其中一个条件时,事件才能匹配该模式:
until(condition)
- 指定循环模式的停止条件。 意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。
- 仅适用于oneOrMore()
- 注意:它允许在基于事件的条件下清除相应模式的状态。
subtype(subClass)
- 定义当前模式的子类型条件。 如果事件属于此子类型,则事件只能匹配该模式:
pattern.subtype(SubEvent.class);
oneOrMore()
指定此模式至少发生一次匹配事件。
默认情况下, 使用宽松的内部连续性。
注意点: 建议使用until() 或 within() 来启用状态清除
pattern.oneOrMore().until(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ... // alternative condition 可替代的条件 } });
timesOrMore(#times)
指定此模式至少需要 #times 次出现匹配事件。
默认情况下, 使用宽松的内部连续性(在后续事件之间)。
pattern.timesOrMore(2);
pattern.time(#ofTimes)
指定此模式需要匹配事件的确切出现次数。
默认情况下, 使用宽松的内部连续性 (在后续事件之间)。
pattern.times(2);
times(#fromTimes, #toTimes)
指定此模式期望在匹配事件的#fromTimes 和 #toTimes次之间出现。
默认情况下, 使用宽松的内部连续性。
pattern.times(2, 4);
optional()
指定此模式是可选的,即有可能根本不会发生。 这适用于所有上述量词。
pattern.oneOrMore().optional();
greedy()
指定此模式是贪婪的,即它将尽可能多地重复。 这仅适用于quantifiers,目前不支持组模式。
pattern.oneOrMore().greedy();
consecutive()
与oneOrMore()和times()一起使用并在匹配事件之间强加严格的连续性,即任何不匹配的元素都会中断匹配。
如果不使用,则使用宽松的连续性(如followBy())。
例如以下模式:
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }) .followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).oneOrMore().consecutive() .followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } });
allowCombinations()
与oneOrMore()和times()一起使用,并在匹配事件之间强加非确定性宽松连续性(如 followedByAny()
如果不应用,则使用宽松的连续性(如followBy())。
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }) .followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).oneOrMore().allowCombinations() .followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } });
组合模式
模式序列必须以初始模式开始,如下所示:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。
可以使用:
next() 对应 严格, followedBy() 对应 宽松连续性 followedByAny() 对应 非确定性宽松连续性
或:
notNext() 如果不希望一个事件类型紧接着另一个类型出现。 notFollowedBy() 不希望两个事件之间任何地方出现该事件。 注意 模式序列不能以notFollowedBy()结束。 注意 NOT模式前面不能有可选模式
// strict contiguity 强连续性 Pattern<Event, ?> strict = start.next("middle").where(...); // relaxed contiguity 松连续性 Pattern<Event, ?> relaxed = start.followedBy("middle").where(...); // non-deterministic relaxed contiguity 不可确认的松连续性 Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...); // NOT pattern with strict contiguity 非模式强连续性 Pattern<Event, ?> strictNot = start.notNext("not").where(...); // NOT pattern with relaxed contiguity 非模式松连续性 Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
宽松连续性指的是仅第一个成功匹配的事件会被匹配到,然而非确定性宽松连续性,相同的开始会有多个匹配结果发出。距离,如果一个模式是"a b",给定输入序列是"a c b1 b2"。对于不同连续性会有不同输出。
a和b之间严格连续性,将会返回{},也即是没有匹配。因为c的出现导致a,被抛弃了。
a和b之间宽松连续性,返回的是{a,b1},因为宽松连续性将会抛弃为匹配成功的元素,直至匹配到下一个要匹配的事件
a和b之间非确定性宽松连续性,返回的是{a,b1},{a,b2}。
也可以为模式定义时间约束。 例如,可以通过pattern.within()方法定义模式应在10秒内发生。 时间模式支持处理和事件时间。
注意: 模式序列只能有一个时间约束。 如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。
next.within(Time.seconds(10));
- 可以为begin,followBy,followByAny和next定义一个模式序列作为条件。
- 模式序列将被逻辑地视为匹配条件,而且将返回GroupPattern并且 可对GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。
PatternPatte <Event, ?> start = Pattern.begin( Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...) ); // strict contiguity Pattern<Event, ?> strict = start.next( Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...) ).times(3); // relaxed contiguity Pattern<Event, ?> relaxed = start.followedBy( Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...) ).oneOrMore(); // non-deterministic relaxed contiguity Pattern<Event, ?> nonDetermin = start.followedByAny( Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...) ).optional();
API
begin(#name)
- 定义一个开始模式
Pattern<Event, ?> start = Pattern.<Event>begin("start");
begin(#pattern_sequence)
- 定义一个开始模式
Pattern<Event, ?> start = Pattern.<Event>begin( Pattern.<Event>begin("start").where(...).followedBy("middle").where(...) );
next(#name)
- 追加一个新的模式。匹配事件必须直接跟着先前的匹配事件(严格连续性):
Pattern<Event, ?> next = start.next("middle");
next(#pattern_sequence)
- 追加一个新的模式。匹配事件必须直接接着先前的匹配事件(严格连续性):
Pattern<Event, ?> next = start.next( Pattern.<Event>begin("start").where(...).followedBy("middle").where(...) );
followedBy(#name)
- 追加加新模式。 匹配事件和先前匹配事件(宽松连续)之间可能发生其他非匹配事件:
Pattern<Event, ?> followedBy = start.followedBy("middle")
- followedBy(#pattern_sequence)
- 追加新模式。 匹配事件和先前匹配事件(宽松连续)之间可能发生其他非匹配事件:
Pattern<Event, ?> followedBy = start.followedBy( Pattern.<Event>begin("start").where(...).followedBy("middle").where(...) );
- followedByAny(#name)
- 添加新模式。 匹配事件和先前匹配事件之间可能发生其他事件,并且将针对每个备选匹配事件(非确定性放松连续性)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny("middle");
- followedByAny(#pattern_sequence)
- 添加新模式。 匹配事件和先前匹配事件之间可能发生其他事件,并且将针对每个备选匹配事件(非确定性放松连续性)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny( Pattern.<Event>begin("start").where(...).followedBy("middle").where(...) );
- notNext()
- 添加新的否定模式。 匹配(否定)事件必须直接跟着先前的匹配事件(严格连续性)才能丢弃部分匹配:
Pattern<Event, ?> notNext = start.notNext("not");
- notFollowedBy()
- 追加一个新的否定模式匹配。即使在匹配(否定)事件和先前匹配事件(宽松连续性)之间发生其他事件,也将丢弃部分匹配事件序列:
Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
- within(time)
- 定义事件序列进行模式匹配的最大时间间隔。 如果未完成的事件序列超过此时间,则将其丢弃:
pattern.within(Time.seconds(10));
匹配后的跳过策略
对于给定模式,可以将同一事件分配给多个成功匹配。 要控制将分配事件的匹配数,需要指定名为AfterMatchSkipStrategy的跳过策略。 跳过策略有四种类型,如下所示:
- NO_SKIP:将发出每个可能的匹配。
- SKIP_PAST_LAST_EVENT:丢弃包含匹配事件的每个部分匹配。
- SKIP_TO_FIRST:丢弃包含PatternName第一个之前匹配事件的每个部分匹配。
SKIP_TO_LAST:丢弃包含PatternName最后一个匹配事件之前的每个部分匹配。
注意点:
使用SKIP_TO_FIRST和SKIP_TO_LAST跳过策略时,还应指定有效的PatternName。
例如,对于给定模式a b {2}和数据流ab1,ab2,ab3,ab4,ab5,ab6,这四种跳过策略之间的差异如下:
要指定要使用的跳过策略,只需调用以下命令创建AfterMatchSkipStrategy:
使用方法:
AfterMatchSkipStrategy skipStrategy = ... Pattern.begin("patternName", skipStrategy);
检测模式 - Detecting Patterns
指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。 要针对模式序列运行事件流,必须创建PatternStream。
给定输入流 input,模式 pattern 和可选的比较器 comparator,用于在EventTime的情况下对具有相同时间戳的事件进行排序或在同一时刻到达,通过调用以下命令创建PatternStream
DataStream<Event> input = ... Pattern<Event, ?> pattern = ... EventComparator<Event> comparator = ... // optional PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
根据实际情况,创建的流可以是有key,也可以是无key的。
注意点: 在无key的流上使用模式,将导致job的并行度为1。
Selecting from Patterns
获得PatternStream后,您可以通过select或flatSelect方法从检测到的事件序列中进行查询。
select()方法需要PatternSelectFunction的实现。 PatternSelectFunction具有为每个匹配事件序列调用的select方法。
它以Map <String,List >的形式接收匹配,其中key是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是输入元素的类型)。
给定模式的事件按时间戳排序。 返回每个模式的接受事件列表的原因是当使用循环模式(例如oneToMany()和times())时,对于给定模式可以接受多个事件。
选择函数只返回一个结果。
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> { @Override public OUT select(Map<String, List<IN>> pattern) { IN startEvent = pattern.get("start").get(0); IN endEvent = pattern.get("end").get(0); return new OUT(startEvent, endEvent); } }
- PatternFlatSelectFunction类似于PatternSelectFunction,唯一的区别是它可以返回任意数量的结果。 为此,select方法有一个额外的Collector参数,用于将输出元素向下游转发。
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> { @Override public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) { IN startEvent = pattern.get("start").get(0); IN endEvent = pattern.get("end").get(0); for (int i = 0; i < startEvent.getValue(); i++ ) { collector.collect(new OUT(startEvent, endEvent)); } } }
处理超时部分模式
每当模式具有通过within关键字附加的时间窗口长度时,部分事件序列可能因为超出时间窗口长度而被丢弃。
为了对这些超时的部分匹配作出相应的处理,select和flatSelect API调用允许指定超时处理程序。
超时处理程序接收到目前为止由模式匹配的所有事件,以及检测到超时时的时间戳。
为了处理部分模式,select和flatSelect API提供了一个带参数的重载版本
PatternTimeoutFunction/ PatternFlatTimeoutFunction。
OutputTag 超时的匹配将会在其中返回。
PatternSelectFunction / PatternFlatSelectFunction。
PatternStreamPatte <Event> patternStream = CEP.pattern(input, pattern); OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; SingleOutputStreamOperator<ComplexEvent> result = patternStream.select( new PatternTimeoutFunction<Event, TimeoutEvent>() {...}, outputTag, new PatternSelectFunction<Event, ComplexEvent>() {...} ); DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag); SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect( new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...}, outputTag, new PatternFlatSelectFunction<Event, ComplexEvent>() {...} ); DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
时间时间模式下处理滞后数据
在CEP中,元素处理的顺序很重要。
为了保证在采用事件事件时以正确的顺序处理事件,最初将传入的事件放入缓冲区,其中事件基于它们的时间戳以升序排序
并且当watermark到达时,处理该缓冲区中时间戳小于watermark时间的所有元素。这意味着watermark之间的事件按事件时间顺序处理。
注意点:
在采用事件时间时,CEP library会假设watermark是正确的。
为了保证跨watermark的记录按照事件事件顺序处理,Flink的CEP库假定watermark是正确的,并将时间戳小于上次可见watermark的时间视为滞后事件。滞后事件不会被进一步处理。
以上是关于Flink 复杂事物处理的主要内容,如果未能解决你的问题,请参考以下文章