大数据(9g)FlinkCEP
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9g)FlinkCEP相关的知识,希望对你有一定的参考价值。
文章目录
概述
- CEP
Complex Event Processing:复合事件处理
通过分析事件间的关系,从事件流中查询出符合要求的事件序列 - 例如【切菜=>洗菜=>炒菜】3个事件按时间序串联,是正常的事件流
当发现【切菜=>炒菜】忽略洗菜的事件流,可认为是异常事件
示例代码
环境和依赖
WIN10+JDK1.8+IDEA2021+Maven3.6.3
CEP额外依赖为flink-cep
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- FlinkCEP -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
</dependencies>
Java代码
监测 严格近邻的连续三次a的事件流
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CepPractice
public static void main(String[] args) throws Exception
//创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//添加数据源,确定水位线策略
SingleOutputStreamOperator<String> d = env.fromElements("c", "a", "a", "a", "a", "b", "a", "a")
.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> 1L));
//定义模式
Pattern<String, String> p = Pattern
.<String>begin("first")
.where(new SimpleCondition<String>()
@Override
public boolean filter(String value)
return value.equals("a");
)
.next("second")
.where(new SimpleCondition<String>()
@Override
public boolean filter(String value)
return value.equals("a");
)
.next("third")
.where(new SimpleCondition<String>()
@Override
public boolean filter(String value)
return value.equals("a");
);
//在流上匹配模型
PatternStream<String> patternStream = CEP.pattern(d, p);
//使用select方法将匹配到的事件流取出
patternStream.select((PatternSelectFunction<String, String>) map ->
//Map的key是事件名称(上面的first、second和third)
//Map的key对应的value是列表,储存匹配到的事件
String first = map.get("first").toString();
String second = map.get("second").toString();
String third = map.get("third").toString();
return first + "->" + second + "->" + third;
).print();
//执行
env.execute();
打印结果
[a]->[a]->[a]
[a]->[a]->[a]
上面代码可改成下面
留意.times(3).consecutive()
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
public class CepPractice2
public static void main(String[] args) throws Exception
//创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//添加数据源,确定水位线策略
SingleOutputStreamOperator<Tuple2<String, Long>> d = env.fromElements(
Tuple2.of("a", 1000L), Tuple2.of("a", 2000L), Tuple2.of("a", 3000L),
Tuple2.of("a", 4000L), Tuple2.of("b", 5000L), Tuple2.of("a", 6000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.f1));
//定义模式
Pattern<Tuple2<String, Long>, Tuple2<String, Long>> p = Pattern
.<Tuple2<String, Long>>begin("=a")
.where(new SimpleCondition<Tuple2<String, Long>>()
@Override
public boolean filter(Tuple2<String, Long> value)
return value.f0.equals("a");
)
.times(3)
.consecutive(); //严格连续
//在流上匹配模型
PatternStream<Tuple2<String, Long>> patternStream = CEP.pattern(d, p);
//使用select方法将匹配到的事件流取出
patternStream.select((PatternSelectFunction<Tuple2<String, Long>, String>) map ->
//Map的key是事件名称(上面的first、second和third)
//Map的key对应的value是列表,储存匹配到的事件
List<Tuple2<String, Long>> ls = map.get("=a");
String first = ls.get(0).f0;
String second = ls.get(1).f0;
String third = ls.get(2).f0;
return String.join("=>", first, second, third);
).print();
//执行
env.execute();
以上是关于大数据(9g)FlinkCEP的主要内容,如果未能解决你的问题,请参考以下文章
oracle 碎片管理和数据文件resize释放表空间和磁盘空间(以及sys.wri$_optstat_histgrm_history过大处理)