大数据之CEP报警信息

Posted 潇洒哥浩浩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之CEP报警信息相关的知识,希望对你有一定的参考价值。

package com.baway.loginfaildetect

import java.util

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
* @ author yulong
* @ createTime 2020-07-07 14:39
*/
object LoginFailWithCep {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[String] = env.socketTextStream("hadoop102", 6666)

val loginEventStream: KeyedStream[LoginEvent, Long] = stream.map(data => {
val dataArr: Array[String] = data.split(",")
LoginEvent(dataArr(0).trim.toLong, dataArr(1).trim, dataArr(2).trim, dataArr(3).trim.toLong)
}).assignAscendingTimestamps(_.eventTime * 1000)
.keyBy(_.userId)

//2. 定义匹配模式
val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail")
.next("next").where(_.eventType == "fail")
.within(Time.seconds(2))

//3. 把定义的pattern应用到输入流
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream, loginFailPattern)

//4. 使用select方法检出符合模式的事件序列
val loginFailDataStream: DataStream[Warning] = patternStream.select(new LoginFailMatch())

//打印输出
loginFailDataStream.print("warning")
//输入数据
loginEventStream.print("input")

env.execute()

}

}

//自定义PatternSelectFunction 输出报警信息
class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning]{
override def select(pattern: util.Map[String, util.List[LoginEvent]]): Warning = {

//第一个失败事件
val firstFail: LoginEvent = pattern.get("begin").iterator().next()
//第二个失败事件
val secondFail: LoginEvent = pattern.get("next").iterator().next()
//包装成Warning输出
Warning(firstFail.userId, firstFail.eventTime, secondFail.eventTime, "在2秒内连续2次登陆失败。")

}
}

以上是关于大数据之CEP报警信息的主要内容,如果未能解决你的问题,请参考以下文章

大数据计算引擎之Flink Flink CEP复杂事件编程

java大数据之“Kafka”

大数据开发-Flink-CEP的主要原理和使用

大数据开发-Flink-CEP的主要原理和使用

大数据开发-Flink-CEP的主要原理和使用

大数据系统之监控系统