使用 Apache Beam 按键处理事件的总排序

Posted

技术标签:

【中文标题】使用 Apache Beam 按键处理事件的总排序【英文标题】:Processing Total Ordering of Events By Key using Apache Beam 【发布时间】:2017-08-25 20:07:23 【问题描述】:

问题背景

我正在尝试从实时流中为每个键生成事件项的总(线性)顺序,其中顺序是事件时间(源自事件有效负载)。

方法

我曾尝试使用流式实现此功能,如下所示:

1) 设置一个不重叠的顺序窗口,例如时长 5 分钟

2) 建立允许的迟到 - 丢弃迟到的事件是可以的

3) 设置累积模式以保留所有触发的窗格

4) 使用“AfterwaterMark”触发器

5) 处理触发窗格时,仅考虑最后一个窗格

6) 使用 GroupBy.perKey 确保此窗口中此键的所有事件都将作为单个资源上的一个单元进行处理

虽然这种方法可以确保给定窗口内每个键的线性顺序,但它不能保证跨多个窗口,例如可能有一个键的事件窗口,之后发生的事件与之前的窗口同时处理,如果第一个窗口失败并且必须重试,这很容易发生。

我正在考虑采用这种方法,首先可以处理实时流,以便通过键对事件进行分区并将它们写入由窗口范围命名的文件。 由于光束处理的并行性质,这些文件也会乱序生成。 然后,单个流程协调器可以将这些文件按顺序提交到批处理管道 - 只有在收到前一个文件并且下游处理已成功完成时才提交下一个。

问题是 Apache Beam 只会在该时间窗口中至少有一个时间元素时触发一个窗格。因此,如果事件中存在间隙,则生成的文件中可能存在间隙 - 即丢失文件。丢失文件的问题在于,协调批处理器无法区分是否知道时间窗口已经过去而没有数据,或者在这种情况下它无法继续直到文件最终到达。

强制触发事件窗口的一种方法可能是以某种方式将虚拟事件添加到每个分区和时间窗口的流中。但是,这很难做到……如果时间序列中有很大的差距,那么如果这些虚拟事件发生在很晚的事件周围,那么它们将被视为迟到而被丢弃。

是否有其他方法可以确保每个可能的事件窗口都有一个触发器,即使这会导致输出空文件?

从实时流中通过键生成总排序是 Apache Beam 的一个易于处理的问题吗?我应该考虑另一种方法吗?

【问题讨论】:

【参考方案1】:

根据您对易处理的定义,当然可以在 Apache Beam 中按事件时间戳为每个键完全排序一个流。

以下是设计背后的考虑:

    Apache Beam 不保证按顺序传输,因此在管道中没有用处。因此,我假设您正在这样做,以便您可以写入外部系统,只有在它们按顺序处理的情况下才能处理它们。 如果一个事件有时间戳t,你永远不能确定没有更早的事件会到达,除非你等到 t 是可丢弃的。

所以我们将这样做:

    我们将在全局窗口中编写一个使用state 和计时器(blog post still under review) 的ParDo。这使它成为一个按键的工作流程。 当元素到达时,我们将缓冲它们的状态。因此,您允许的延迟会影响您需要的数据结构的效率。您需要的是一个堆来查看和弹出最小时间戳和元素;没有内置的堆状态,所以我将其写为ValueState。 我们将设置一个事件时间计时器,以便在元素的时间戳不再矛盾时接收回调。

为了简洁起见,我将假设一个自定义的EventHeap 数据结构。在实践中,您希望将其分解为多个状态单元以最小化传输的数据。堆可能是对原始状态类型的合理补充。

我还将假设我们需要的所有编码器都已注册并专注于状态和计时器逻辑。

new DoFn<KV<K, Event>, Void>() 

  @StateId("heap")
  private final StateSpec<ValueState<EventHeap>> heapSpec = StateSpecs.value();

  @TimerId("next")
  private final TimerSpec nextTimerSpec = TimerSpec.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) 
    EventHeap heap = firstNonNull(
      heapState.read(),
      EventHeap.createForKey(ctx.element().getKey()));
    heap.add(ctx.element().getValue());
    // When the watermark reaches this time, no more elements
    // can show up that have earlier timestamps
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  

  @OnTimer("next")
  public void onNextTimestamp(
      OnTimerContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) 
    EventHeap heap = heapState.read();
    // If the timer at time t was delivered the watermark must
    // be strictly greater than t
    while (!heap.nextTimestamp().isAfter(ctx.timestamp())) 
      writeToExternalSystem(heap.pop());
    
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  

这有望让您开始朝着您的基础用例迈进。

【讨论】:

这很棒。 ParDo 函数的输出是否可以使用 OnTimerContext 对象从 OnTimer 方法而不是 ProcessElement 方法(process(..))生成? 是的,你也可以通过 OnTimerContext 输出。 “为简洁起见,我将假设一个自定义的 EventHeap 数据结构。在实践中,您可能希望将其分解为多个状态单元以最小化传输的数据。”我不确定这会有什么帮助。为了将每个新项目添加到堆中,您必须首先读取整个堆,然后将其写回。将事件的属性拆分为单独的值状态如何避免读取和写入所有这些状态? 我想到的是,在@ProcessElement 中,您可以对BagState 进行盲写,同时只需更新计时器以获取最小的下一个时间戳。但是最好的节省来自拥有真正的内置HeapState。我真的很喜欢这个主意。另外,为了触发计时器,底层存储系统肯定有一些这样的支持。 @grzes 我最终放弃了 DataFlow 作为一种完全按顺序处理事件的解决方案。我尝试使用 Kenn 建议的方法,但遇到了管道会挂起的各种问题。我试图寻求谷歌支持的帮助,但没有任何结果。我最终在 Apache Spark 上实现了一种批处理方法,该方法已被证明非常成功。

以上是关于使用 Apache Beam 按键处理事件的总排序的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache Beam Python 将输出写入动态路径

Apache Beam 处理文件

在 BigQuery Apache Beam 中访问 TableRow 列

谷歌布局大数据:开源平台Apache Beam正式发布

如何过滤坏和好的 json 事件,然后增加坏 json 记录的指标计数,并使用 java [关闭] 将这些记录存储在 apache Beam 中

Apache Beam是正确的特征预处理工具吗?