Flink CEP 实现恶意登录检测

Posted 逆风飞翔的小叔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink CEP 实现恶意登录检测相关的知识,希望对你有一定的参考价值。

前言

在前一篇中,我们基于KeyedProcessFunction 实现了一个恶意登录检测的简单案例,但是在具体实现过程中我们发现,过程还是比较繁琐的,有没有更好的实现呢,就可以考虑使用CEP的方式;

我们知道,CEP提供了非常丰富而灵活的事件匹配模式,借助这一点,比如要实现检测特定时间窗口内连续2次登录失败时间,使用CEP就非常方便,而不需要注册定时器那样来的麻烦一些;

下面直接上代码,

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.net.URL;
import java.util.List;
import java.util.Map;

public class LoginFailCep 

    public static void main(String[] args) throws Exception

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //时间语义设置
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //从文件中读取数据
        String path = "E:\\\\code-self\\\\flink_study\\\\src\\\\main\\\\resources\\\\LoginLog.csv";

        DataStream<LoginEvent> loginEventStream = env.readTextFile(path)
                .map(line -> 
                    String[] fields = line.split(",");
                    return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
                )
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) 
                    @Override
                    public long extractTimestamp(LoginEvent element) 
                        return element.getTimestamp() * 1000L;
                    
                );

        //通过CEP处理

        //1、定义一个匹配模式
        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() 
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception 
                return "fail".equalsIgnoreCase(loginEvent.getLoginState());
            
        )
                .next("secondFail").where(new SimpleCondition<LoginEvent>() 
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception 
                        return "fail".equalsIgnoreCase(loginEvent.getLoginState());
                    
                )
                .within(Time.seconds(2));

        //2、将匹配模式应用到数据流上面,得到一个 pattern stream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        //3、从流中检测出符合条件的复杂事件,进行转换处理,得到报警信息
        SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginMatchWarning());

        warningStream.print();
        env.execute("login fail job");

    

    public static class LoginMatchWarning implements PatternSelectFunction<LoginEvent,LoginFailWarning>

        @Override
        public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception 
            LoginEvent firstFailEvent = map.get("firstFail").get(0);
            LoginEvent secondFailEvent = map.get("secondFail").get(0);
            return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"fail 2 times");
        
    


代码中用到的两个实体类可以参考上一篇的代码,下面来运行下这段代码,观察下效果,

 

从结果可以检测出,两次连续登录失败的用户ID以及事件发生的事件信息

以上是关于Flink CEP 实现恶意登录检测的主要内容,如果未能解决你的问题,请参考以下文章

如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了

Watermark 在 Flink CEP 中远远落后

案例简介flink CEP

Flink CEP - Flink的复杂事件处理

“广播状态”为 Flink 的 CEP 库解除了“动态模式”功能的实现是啥意思?

是否可以在 apache flink CEP 中处理多个流?