Flink CEP 实现恶意登录检测
Posted 逆风飞翔的小叔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink CEP 实现恶意登录检测相关的知识,希望对你有一定的参考价值。
前言
在前一篇中,我们基于KeyedProcessFunction 实现了一个恶意登录检测的简单案例,但是在具体实现过程中我们发现,过程还是比较繁琐的,有没有更好的实现呢,就可以考虑使用CEP的方式;
我们知道,CEP提供了非常丰富而灵活的事件匹配模式,借助这一点,比如要实现检测特定时间窗口内连续2次登录失败时间,使用CEP就非常方便,而不需要注册定时器那样来的麻烦一些;
下面直接上代码,
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.net.URL;
import java.util.List;
import java.util.Map;
public class LoginFailCep
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//时间语义设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//从文件中读取数据
String path = "E:\\\\code-self\\\\flink_study\\\\src\\\\main\\\\resources\\\\LoginLog.csv";
DataStream<LoginEvent> loginEventStream = env.readTextFile(path)
.map(line ->
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3))
@Override
public long extractTimestamp(LoginEvent element)
return element.getTimestamp() * 1000L;
);
//通过CEP处理
//1、定义一个匹配模式
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>()
@Override
public boolean filter(LoginEvent loginEvent) throws Exception
return "fail".equalsIgnoreCase(loginEvent.getLoginState());
)
.next("secondFail").where(new SimpleCondition<LoginEvent>()
@Override
public boolean filter(LoginEvent loginEvent) throws Exception
return "fail".equalsIgnoreCase(loginEvent.getLoginState());
)
.within(Time.seconds(2));
//2、将匹配模式应用到数据流上面,得到一个 pattern stream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
//3、从流中检测出符合条件的复杂事件,进行转换处理,得到报警信息
SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginMatchWarning());
warningStream.print();
env.execute("login fail job");
public static class LoginMatchWarning implements PatternSelectFunction<LoginEvent,LoginFailWarning>
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception
LoginEvent firstFailEvent = map.get("firstFail").get(0);
LoginEvent secondFailEvent = map.get("secondFail").get(0);
return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"fail 2 times");
代码中用到的两个实体类可以参考上一篇的代码,下面来运行下这段代码,观察下效果,
从结果可以检测出,两次连续登录失败的用户ID以及事件发生的事件信息
以上是关于Flink CEP 实现恶意登录检测的主要内容,如果未能解决你的问题,请参考以下文章
如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了