大数据之CEP报警信息
Posted 潇洒哥浩浩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之CEP报警信息相关的知识,希望对你有一定的参考价值。
package com.baway.loginfaildetect
import java.util
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @ author yulong
* @ createTime 2020-07-07 14:39
*/
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[String] = env.socketTextStream("hadoop102", 6666)
val loginEventStream: KeyedStream[LoginEvent, Long] = stream.map(data => {
val dataArr: Array[String] = data.split(",")
LoginEvent(dataArr(0).trim.toLong, dataArr(1).trim, dataArr(2).trim, dataArr(3).trim.toLong)
}).assignAscendingTimestamps(_.eventTime * 1000)
.keyBy(_.userId)
//2. 定义匹配模式
val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail")
.next("next").where(_.eventType == "fail")
.within(Time.seconds(2))
//3. 把定义的pattern应用到输入流
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream, loginFailPattern)
//4. 使用select方法检出符合模式的事件序列
val loginFailDataStream: DataStream[Warning] = patternStream.select(new LoginFailMatch())
//打印输出
loginFailDataStream.print("warning")
//输入数据
loginEventStream.print("input")
env.execute()
}
}
//自定义PatternSelectFunction 输出报警信息
class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning]{
override def select(pattern: util.Map[String, util.List[LoginEvent]]): Warning = {
//第一个失败事件
val firstFail: LoginEvent = pattern.get("begin").iterator().next()
//第二个失败事件
val secondFail: LoginEvent = pattern.get("next").iterator().next()
//包装成Warning输出
Warning(firstFail.userId, firstFail.eventTime, secondFail.eventTime, "在2秒内连续2次登陆失败。")
}
}
以上是关于大数据之CEP报警信息的主要内容,如果未能解决你的问题,请参考以下文章