防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈

Posted

技术标签:

【中文标题】防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈【英文标题】:Prevent fusion in Apache Beam / Dataflow streaming (python) pipelines to remove pipeline bottleneck 【发布时间】:2019-07-14 00:18:57 【问题描述】:

我们目前正在使用 DataflowRunner 在 Apache Beam 上开发流式传输管道。我们正在从 Pub/Sub 读取消息并对它们进行一些处理,然后我们在滑动窗口中将它们窗口化(当前窗口大小为 3 秒,间隔也是 3 秒)。一旦窗口被触发,我们会对窗口内的元素进行一些后期处理。这个后处理步骤明显大于窗口大小,大约需要 15 秒。

管道的apache束代码:

input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
                   | beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
                                  trigger=AfterCount(30), 
                                  accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)

如您所知,Dataflow 会尝试对您的流水线步骤进行一些优化。在我们的例子中,它从窗口开始将所有内容融合在一起(集群操作:1/处理 2/窗口+后处理),这导致仅由 1 个工作人员对所有窗口进行缓慢的顺序后处理。我们每隔 15 秒就会看到管道正在处理下一个窗口的日志。但是,我们希望让多个工作人员负责单独的窗口,而不是将工作量交给单个工作人员。

因此,我们一直在寻找防止这种融合发生的方法,因此 Dataflow 将窗口与窗口的后处理分开。通过这种方式,我们希望 Dataflow 能够再次将多个工作人员分配给已触发窗口的后处理。

到目前为止我们所做的尝试:

将工人数量增加到 20、30 甚至 40 但没有效果。仅将窗口分配给多个工作人员之前的步骤 运行流水线 5 或 10 分钟,但我们注意到在窗口化后没有重新分配工作人员来帮助完成这个更大的后处理步骤 窗口化后,将它们放回全局窗口中 使用虚拟密钥(如https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion 中所述)模拟另一个 GroupByKey,但没有任何成功。

最后两个动作确实创建了第三个集群操作(1/ 处理 2/ 窗口化 3/ 后处理),但我们注意到在窗口化之后仍然是同一个工作人员正在执行所有操作。 p>

有没有可以解决这个问题陈述的解决方案?

我们现在正在考虑的当前解决方法是构建另一个接收窗口的流式管道,以便这些工作人员可以并行处理窗口,但这很麻烦..

【问题讨论】:

您是否尝试过联系 Google 支持?对于此类特定的内部事务,他们通常比社区更能提供帮助。 【参考方案1】:

你做了正确的事来打破你的元素融合。我怀疑可能有问题让您陷入困境。

对于流式传输,单个键总是在同一个工作器中处理。是否有任何机会,您的所有或大部分记录都分配给一个键?如果是这样,您的处理将在单个工作人员中完成。

你可以做的事情是让窗口成为​​键的一部分,以便多个窗口的元素可以在不同的工作人员中处理,即使它们具有相同的键:

class KeyIntoKeyPlusWindow(core.DoFn):
  def process(self, element, window=core.DoFn.WindowParam):
    key, values = element
    yield ((key, window), element)

group = windows | beam.ParDo(KeyIntoKeyPlusWindow() | beam.GroupByKey()

一旦你完成了,你就可以应用你的后期处理:

group | beam.Map(post_processing_fn)

【讨论】:

以上是关于防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈的主要内容,如果未能解决你的问题,请参考以下文章

GCP Dataflow + Apache Beam - 缓存问题

Dataflow 大型侧输入中的 Apache Beam

无法在 DataFlow Apache Beam 中创建通用日期转换类

Dataflow/Apache Beam 在啥阶段确认发布/订阅消息?

Apache Beam on Dataflow - 加载外部文件

Dataflow 中的自定义 Apache Beam Python 版本