python Dataflow DoFn生命周期中的光束设置()刷新多长时间?

Posted

技术标签:

【中文标题】python Dataflow DoFn生命周期中的光束设置()刷新多长时间?【英文标题】:How long beam setup() refresh in python Dataflow DoFn life cycle? 【发布时间】:2020-12-03 22:05:46 【问题描述】:

我有一个流式传输管道,我需要从 BigQuery 查询作为我的管道转换的参考。由于 BigQuery 表仅在 2 周内更改,因此我将查询缓存放在 setup() 而不是 start_bundle() 中。通过观察日志,我看到 start_bundle() 将在 DoFn 生命周期中刷新它的值,大约 50-100 个元素过程,但 setup() 永远不会刷新。有什么办法可以解决这个问题吗?

【问题讨论】:

【参考方案1】:

虽然您没有提供代码,但我会根据您的解释回答您的问题。

首先,关于DoFn.start_bundle(),每个包都会调用此函数,DataFlow 将根据执行期间收集的指标来决定这些包的大小。

Second,DoFn.setup() 每个工作人员调用一次。只有在重新启动工作人员时才会再次调用它。此外,作为比较,DoFn.processElement() 每个元素调用一次。

由于您需要每周刷新两次查询,因此使用"Slowly-changing lookup cache" 将非常适合SideInput。当您有一个不时更改的查找表时,您可以使用此方法。所以你需要更新查找的结果。但是,您可以使用流模式,而不是在批处理模式下使用单个查询。它允许您基于 GlobalWindow 更新查找结果(在您的情况下为查询结果)。之后,有了这个侧面输入,您就可以在您的主流 PCollection 中使用它。

注意:我必须指出,作为一个限制,sideInputs 无法正常处理大量数据(许多 Gbs 或 Tb)。此外,explanation 信息量很大。

【讨论】:

【参考方案2】:

上面的答案很好。作为替代方案,您可以调用 start_bundle() 中的方法返回结果的缓存版本,只要它足够新鲜,否则会从 BQ 进行完整读取。见,例如Python in-memory cache with time to live

【讨论】:

以上是关于python Dataflow DoFn生命周期中的光束设置()刷新多长时间?的主要内容,如果未能解决你的问题,请参考以下文章

收到 PubSub 通知后触发 Dataflow 作业

python Django请求生命周期

python-django的生命周期

客户生命周期-python install lifetimes

为啥没有适用于 python 的 Spring DI(组件生命周期)框架?

从 C 扩展跟踪 CPython 对象的生命周期