Apache Beam Stateful DoFn 周期性输出所有 K/V 对

Posted

技术标签:

【中文标题】Apache Beam Stateful DoFn 周期性输出所有 K/V 对【英文标题】:Apache Beam Stateful DoFn Periodically Output All K/V Pairs 【发布时间】:2020-05-01 11:56:53 【问题描述】:

我正在尝试使用有状态 DoFn(使用 @ProcessElement@StateId ValueState 元素)在 Apache Beam(通过 Scio)中聚合(每个键)流数据源。我认为这最适合我要解决的问题。要求是:

对于给定的键,所有时间的记录都被聚合(基本上求和) - 我不关心以前计算的聚合,只关心最近的 根据我控制的某些条件,密钥可能会从状态 (state.clear()) 中逐出 每 5 分钟,无论是否看到任何新密钥,都应输出所有 尚未从状态中驱逐的密钥

鉴于这是一个流式管道并且将无限期地运行,在具有累积触发窗格的全局窗口上使用 combinePerKey 似乎会继续增加其内存占用以及随着时间的推移需要运行的数据量,所以我想避免它。此外,在对此进行测试时,(可能与预期的一样)它只是将新计算的聚合与历史输入一起附加到输出中,而不是使用每个键的最新值。

我的想法是,使用 StatefulDoFn 只会让我输出所有全局状态直到 now(),但这似乎不是一个简单的解决方案。我已经看到使用计时器为此人为执行回调的提示,以及可能使用缓慢增长的侧面输入映射 (How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>) 并以某种方式刷新它,但这基本上需要迭代映射中的所有值而不是加入它。

我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中的许多窗口和计时器概念相对较新,正在寻找有关如何解决此问题的任何建议。谢谢!

【问题讨论】:

明天我会尽力帮助解答这个问题! 我怀疑窗口/触发选项可能会影响您的解决方案。如果您设置计时器来控制输出速率,则不需要上游触发器。您可以尝试删除上游触发器吗? 谢谢@Pablo,这似乎有帮助。我现在可以看到密钥每 5 分钟输出一次,并且当新记录中不存在时,它们会通过计时器触发!知道为什么我会在给定窗口的输出中看到两次相同的键吗?我认为 TimestampCombiner.LATEST 会解决这个问题? 所以窗口/触发只会在“洗牌”发生时影响您的管道。在 GBK 上或在有状态 ParDo 之前完成洗牌。您可以尝试添加 GroupByKey 吗? - 我相信在你的情况下,时间戳组合器将确定来自你的 GBK 的 KV> 的时间戳(并且它不会丢弃旧元素) - 但在 GBK 之后你可以丢弃旧元素自己元素 为什么将计时器设置为 150 秒,但在窗口中使用 5 分钟?确保有输出? 【参考方案1】:

你说得对,Stateful DoFn 应该在这里帮助你。这是您可以做什么的基本草图。请注意,这仅输出没有密钥的总和。它可能不是你想要的,但它应该可以帮助你前进。

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> 

  @TimerId("emitter")
  private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("done")
  private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();

  @StateId("agg")
  private final StateSpec<CombiningState<Integer, int[], Integer>>
      aggSpec = StateSpecs.combining(
          Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());

  @ProcessElement
  public void processElement(ProcessContext c,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) throws Exception 
        if (SOME CONDITION) 
          countValueState.clear();
          doneState.write(true);
         else 
          countValueState.addAccum(c.element().getValue());
          emitterTimer.align(Duration.standardMinutes(5)).setRelative();
        
      
    

  @OnTimer("emitter")
  public void onEmit(
      OnTimerContext context,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) 
      Boolean isDone = doneState.read();
      if (isDone != null && isDone) 
        return;
       else 
        context.output(aggState.getAccum());
        // Set the timer to emit again
        emitterTimer.align(Duration.standardMinutes(5)).setRelative();
      
    
  
  

很高兴与您一起迭代一些可行的方法。

【讨论】:

非常感谢@pablo!我在上面发布了一些代码 sn-ps(似乎与您的建议非常相似),但似乎无法正确刷新输出。我注意到两个不同之处是您建议的 TimerSpec 使用的是 PROCESSING_TIME 而不是 EVENT_TIME。此外,您建议使用 CombiningState。您认为这些影响足以导致我看到的问题吗?【参考方案2】:

@Pablo 确实正确,StatefulDoFn 和计时器在这种情况下很有用。这是我能够开始工作的代码。

有状态的做Fn

// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]

class StatefulDoFn extends DoFnT 

  @StateId("key")
  private val keySpec = StateSpecs.value[String]()

  @StateId("domainState")
  private val domainStateSpec = StateSpecs.value[DomainState]()

  @TimerId("loopingTimer")
  private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)

  @ProcessElement
  def process(
      context: DoFnT#ProcessContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = 

    ... logic to create key/value from potentially null values

    if (keepState(value)) 
      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      stateKey.write(key)
      stateValue.write(value)

      if (flushState(value)) 
        context.output(KV.of(key, value))
      
     else 
      stateValue.clear()
    
  

  @OnTimer("loopingTimer")
  def onLoopingTimer(
      context: DoFnT#OnTimerContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = 

    ... logic to create key/value checking for nulls

    if (keepState(value)) 

      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      if (flushState(value)) 
        context.output(KV.of(key, value))
      
    
  

有管道

sc
  .pubsubSubscription(...)
  .keyBy(...)
  .withGlobalWindow()
  .applyPerKeyDoFn(new StatefulDoFn())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO,
      // Only take the latest per key during a window
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
  .reduceByKey(mostRecentEvent())
  .saveAsCustomOutput(TextIO.write()...)

【讨论】:

以上是关于Apache Beam Stateful DoFn 周期性输出所有 K/V 对的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam:DoFn 与 PTransform

如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple

将 PANDAS 与 Apache Beam 一起使用

Python Apache Beam 侧输入断言错误

如何在 python apache Beam 的窗口中订购元素?

无法从 Beam 中的 GCS 读取 PubSub gz 文件