Apache Beam Stateful DoFn 周期性输出所有 K/V 对
Posted
技术标签:
【中文标题】Apache Beam Stateful DoFn 周期性输出所有 K/V 对【英文标题】:Apache Beam Stateful DoFn Periodically Output All K/V Pairs 【发布时间】:2020-05-01 11:56:53 【问题描述】:我正在尝试使用有状态 DoFn(使用 @ProcessElement
和 @StateId
ValueState
元素)在 Apache Beam(通过 Scio)中聚合(每个键)流数据源。我认为这最适合我要解决的问题。要求是:
state.clear()
) 中逐出
每 5 分钟,无论是否看到任何新密钥,都应输出所有 尚未从状态中驱逐的密钥
鉴于这是一个流式管道并且将无限期地运行,在具有累积触发窗格的全局窗口上使用 combinePerKey
似乎会继续增加其内存占用以及随着时间的推移需要运行的数据量,所以我想避免它。此外,在对此进行测试时,(可能与预期的一样)它只是将新计算的聚合与历史输入一起附加到输出中,而不是使用每个键的最新值。
我的想法是,使用 StatefulDoFn 只会让我输出所有全局状态直到 now(),但这似乎不是一个简单的解决方案。我已经看到使用计时器为此人为执行回调的提示,以及可能使用缓慢增长的侧面输入映射 (How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>) 并以某种方式刷新它,但这基本上需要迭代映射中的所有值而不是加入它。
我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中的许多窗口和计时器概念相对较新,正在寻找有关如何解决此问题的任何建议。谢谢!
【问题讨论】:
明天我会尽力帮助解答这个问题! 我怀疑窗口/触发选项可能会影响您的解决方案。如果您设置计时器来控制输出速率,则不需要上游触发器。您可以尝试删除上游触发器吗? 谢谢@Pablo,这似乎有帮助。我现在可以看到密钥每 5 分钟输出一次,并且当新记录中不存在时,它们会通过计时器触发!知道为什么我会在给定窗口的输出中看到两次相同的键吗?我认为 TimestampCombiner.LATEST 会解决这个问题? 所以窗口/触发只会在“洗牌”发生时影响您的管道。在 GBK 上或在有状态 ParDo 之前完成洗牌。您可以尝试添加 GroupByKey 吗? - 我相信在你的情况下,时间戳组合器将确定来自你的 GBK 的 KV你说得对,Stateful DoFn 应该在这里帮助你。这是您可以做什么的基本草图。请注意,这仅输出没有密钥的总和。它可能不是你想要的,但它应该可以帮助你前进。
class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer>
@TimerId("emitter")
private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("done")
private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();
@StateId("agg")
private final StateSpec<CombiningState<Integer, int[], Integer>>
aggSpec = StateSpecs.combining(
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());
@ProcessElement
public void processElement(ProcessContext c,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) throws Exception
if (SOME CONDITION)
countValueState.clear();
doneState.write(true);
else
countValueState.addAccum(c.element().getValue());
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
@OnTimer("emitter")
public void onEmit(
OnTimerContext context,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer)
Boolean isDone = doneState.read();
if (isDone != null && isDone)
return;
else
context.output(aggState.getAccum());
// Set the timer to emit again
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
很高兴与您一起迭代一些可行的方法。
【讨论】:
非常感谢@pablo!我在上面发布了一些代码 sn-ps(似乎与您的建议非常相似),但似乎无法正确刷新输出。我注意到两个不同之处是您建议的 TimerSpec 使用的是 PROCESSING_TIME 而不是 EVENT_TIME。此外,您建议使用 CombiningState。您认为这些影响足以导致我看到的问题吗?【参考方案2】:@Pablo 确实正确,StatefulDoFn 和计时器在这种情况下很有用。这是我能够开始工作的代码。
有状态的做Fn
// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]
class StatefulDoFn extends DoFnT
@StateId("key")
private val keySpec = StateSpecs.value[String]()
@StateId("domainState")
private val domainStateSpec = StateSpecs.value[DomainState]()
@TimerId("loopingTimer")
private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)
@ProcessElement
def process(
context: DoFnT#ProcessContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit =
... logic to create key/value from potentially null values
if (keepState(value))
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
stateKey.write(key)
stateValue.write(value)
if (flushState(value))
context.output(KV.of(key, value))
else
stateValue.clear()
@OnTimer("loopingTimer")
def onLoopingTimer(
context: DoFnT#OnTimerContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit =
... logic to create key/value checking for nulls
if (keepState(value))
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
if (flushState(value))
context.output(KV.of(key, value))
有管道
sc
.pubsubSubscription(...)
.keyBy(...)
.withGlobalWindow()
.applyPerKeyDoFn(new StatefulDoFn())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO,
// Only take the latest per key during a window
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.reduceByKey(mostRecentEvent())
.saveAsCustomOutput(TextIO.write()...)
【讨论】:
以上是关于Apache Beam Stateful DoFn 周期性输出所有 K/V 对的主要内容,如果未能解决你的问题,请参考以下文章
如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple