是否可以在 apache flink CEP 中处理多个流?

Posted

技术标签:

【中文标题】是否可以在 apache flink CEP 中处理多个流?【英文标题】:Is it possible to process multiple streams in apache flink CEP? 【发布时间】:2017-07-28 17:15:16 【问题描述】:

我的问题是,如果我们有两个原始事件流,即 SmokeTemperature,并且我们想找出复杂事件,即 Fire > 通过将运算符应用于原始流已经发生了,我们可以在 Flink 中这样做吗?

我问这个问题是因为到目前为止我看到的所有 Flink CEP 示例都只包含一个输入流。如果我错了,请纠正我。

【问题讨论】:

【参考方案1】:

简答 - 是的,您可以读取和处理多个流,并根据来自不同流源的事件类型触发规则。

长答案 - 我的要求有点相似,我的回答是基于您正在阅读来自不同 kafka 主题的不同流的假设。

从在单一来源中流式传输不同事件的不同主题中读取:

FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
        new StringSerializerToEvent(),
        props);

kafkaSource.assignTimestampsAndWatermarks(new 
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
        .filter(Objects::nonNull);

序列化程序读取数据并将它们解析为具有通用格式 - 例如。

@Data
public class BAMEvent 
 private String keyid;  //If key based partitioning is needed
 private String eventName; // For different types of events
 private String eventId;  // Any other field you need
 private long timestamp; // For event time based processing 

 public String toString()
   return eventName + " " + timestamp + " " + eventId + " " + correlationID;
 


然后,事情就很简单了,根据事件名称定义规则并比较事件名称以定义规则(您也可以定义复杂的规则如下):

Pattern.<BAMEvent>begin("first")
        .where(new SimpleCondition<BAMEvent>() 
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(BAMEvent event) throws Exception 
            return event.getEventName().equals("event1");
          
        )
        .followedBy("second")
        .where(new IterativeCondition<BAMEvent>() 
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception 

            if (!secondEvent.getEventName().equals("event2")) 
              return false;
            

            for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) 
              if (secondEvent.getEventId = firstEvent.getEventId()) 
                return true;
              
            
            return false;
          
        )
        .within(withinTimeRule);

我希望这能让您想到将一个或多个不同的流集成在一起。

【讨论】:

【参考方案2】:

我想知道是否可以完成严格的链接(如果可以使用 next,而不是 followBy),因为在给定的流中,特定时间戳可能有很多事件。所以说时间 t1-: a,b,c - 这三个事件来了,时间 t2-: a2,b2,c2 来到 flink 引擎。所以,我想知道我们如何获得 event(a).next(a2),因为它可能永远不会是这样,因为系列会类似于 -: 一种 b C a2 b2 c2

但是,如果 CEP 模块处理事件时将一个时间戳视为单个事件,那么这是有道理的。

【讨论】:

以上是关于是否可以在 apache flink CEP 中处理多个流?的主要内容,如果未能解决你的问题,请参考以下文章

大数据之CEP报警信息

Flink 复杂事件处理

在 Apache Flink 中动态添加模式而不重新启动作业

大数据计算引擎之Flink Flink CEP复杂事件编程

apache Flink初探

Flink CEP - Flink的复杂事件处理