Apache Flink - 如果在 x 分钟内没有收到数据,则发送事件

Posted

技术标签:

【中文标题】Apache Flink - 如果在 x 分钟内没有收到数据,则发送事件【英文标题】:Apache Flink - Send event if no data was received for x minutes 【发布时间】:2018-04-14 01:26:34 【问题描述】:

如何使用 Flink 的 DataStream API 实现一个操作符,在一定时间内没有从流中接收到数据时发送事件?

【问题讨论】:

【参考方案1】:

这样的操作符可以使用ProcessFunction来实现。

DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);

input
  // use keyBy to have keyed state. 
  // NullByteKeySelector will move all data to one task. You can also use other keys
  .keyBy(new NullByteKeySelector())
  // use process function with 60 seconds timeout
  .process(new TimeOutFunction(60 * 1000));

TimeOutFunction 定义如下。在此示例中,它使用处理时间。

public static class TimeOutFunction extends ProcessFunction<Long, Boolean> 

  // delay after which an alert flag is thrown
  private final long timeOut;
  // state to remember the last timer set
  private transient ValueState<Long> lastTimer;

  public TimeOutFunction(long timeOut) 
    this.timeOut = timeOut;
  

  @Override
  public void open(Configuration conf) 
    // setup timer state
    ValueStateDescriptor<Long> lastTimerDesc = 
      new ValueStateDescriptor<Long>("lastTimer", Long.class);
    lastTimer = getRuntimeContext().getState(lastTimerDesc);
  

  @Override
  public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception 
    // get current time and compute timeout time
    long currentTime = ctx.timerService().currentProcessingTime();
    long timeoutTime = currentTime + timeOut;
    // register timer for timeout time
    ctx.timerService().registerProcessingTimeTimer(timeoutTime);
    // remember timeout time
    lastTimer.update(timeoutTime);
  

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception 
    // check if this was the last timer we registered
    if (timestamp == lastTimer.value()) 
      // it was, so no data was received afterwards.
      // fire an alert.
      out.collect(true);
    
  

【讨论】:

一个小周。如果流至少接收一次数据,则此设置很好。任何检测方式,如果流根本没有接收到数据。一次都没有? 我可以将您的解决方案用于 Kafka 消费者,包括 processElement 中的 out.collect 吗? (我的完整问题是***.com/questions/58280077/…)。目前我的消费者不会停止并无限获取.. 在上面的例子中因为我们注册了一个ProcessingTimeTimer。我们是否应该指定 TimeCharacteristic.EventTime/ProcessingTime,或者在这种情况下无关紧要。有什么提示吗? 您可以随时注册处理时间计时器。如果您需要事件时间,您应该通过 TimeCharacteristics 启用它 这没有回答问题。这仅在流程函数至少接收一次元素时才有效,因为只能在“processElement”中创建计时器。【参考方案2】:

您可以使用自定义触发功能设置时间窗口。在触发函数中,每次接收到事件时,“onEvent”方法都会将 processingTimeTrigger 设置为“currentTime + desiredTimeDelay”。然后,当有新事件出现时,您删除之前设置的触发器并创建一个新的触发器。如果一个事件在系统时间是 processingTimeTrigger 上的时间之前没有出现,它会触发并且窗口将被处理。即使没有事件发生,要处理的事件列表也只是空的。

【讨论】:

代码可行吗?至少sn-p? 我会选择@F*** Hueske 的回答。为您的目的更进一步。

以上是关于Apache Flink - 如果在 x 分钟内没有收到数据,则发送事件的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink CEP如何检测事件是不是在x秒内没有发生?

Apache Flink - Window

Apache Flink 入门示例demo

Apache Flink 1.2.0正式发布及其功能介绍

回顾 | Apache Flink X Apache RocketMQ · 上海站(PPT下载)

Apache Flink Meetup 7.10 北京站,Flink x TiDB 专场等你来!