使用 DataflowRunner 和 Dataflow 服务运行时,PubsubIO 不会将自定义时间戳属性输出为 context.timestamp
Posted
技术标签:
【中文标题】使用 DataflowRunner 和 Dataflow 服务运行时,PubsubIO 不会将自定义时间戳属性输出为 context.timestamp【英文标题】:PubsubIO does not output custom timestamp attribute as context.timestamp when running with DataflowRunner and Dataflow service 【发布时间】:2019-03-13 06:49:51 【问题描述】:我正在处理一个 Apache Beam 项目,该项目遇到了与自定义时间戳属性相关的 Dataflow 服务和 PubsubIO 问题。 Beam SDK 的当前版本是 2.7.0。
在项目中,我们有 2 个 Dataflow 作业通过 PubSub 主题和订阅进行通信:
第一个管道(将数据下沉到 PubSub)
此管道适用于基于消息的消息,因此除了GlobalWindows
(Beam 默认)之外,它没有应用自定义窗口策略。在这个管道的末端,我们将所有已经分配了一个属性映射(包括它们的事件时间戳,例如“published_at”)的消息下沉(写入)到一个使用 PubsubIO.writeMessages()
的 PubSub 主题。
注意:如果我们使用PubsubIO.writeMessages().withTimestampAttribute()
,这个方法会告诉PubsubIO.ShardFn
、PubsubIO.WriteFn
和PubsubClient
去write/overwrite下沉管道的处理时间 到地图中的此属性。
第二个管道(从 PubSub 读取数据)
在第二个流水线(读取流水线)中,我们尝试了PubsubIO.readMessagesWithAttributes().withTimestampAttribute("published_at")
和PubsubIO.readStrings().withTimestampAttribute("published_at")
作为源。
ProcessContext.timestamp()
的下游阶段等于它们的
事件时间戳"published_at"
。
但是当使用 DataflowRunner 运行时,ProcessContext.timestamp()
始终设置为接近实时,接近下沉
管道的处理时间。我们检查并可以确认那些
时间戳不是来自 PubSub 的发布时间。所有的数据都是
然后分配到错误的窗口与其事件域相比
时间戳。我们预计迟交的数据不会被分配
进入无效的窗口。
注意:在我们打开第二个管道以获取某种历史/后期数据之前,我们已经让 Pubsub 主题填充了大量数据。
Pubsub messages with invalid context timestamp
假设的根本原因
深入研究DataflowRunner的源代码,我们可以看到Dataflow Service使用完全不同的Pubsub代码(在管道构建时覆盖PubsubIO.Read)来读取和接收到Pubsub .
所以如果我们想使用 Beam SDK 的 PubsubIO,我们必须使用实验选项"enable_custom_pubsub_source"
。但到目前为止还没有运气,因为我们遇到了这个问题https://jira.apache.org/jira/browse/BEAM-5674 并且无法测试 Beam SDK 的 Pubsub 代码。
解决办法
我们当前的解决方法是,在将窗口分配给消息的步骤之后,我们实现了 DoFn
来检查他们的事件时间戳与他们的@987654337 @。 如果窗口无效,那么我们只需删除消息,然后运行每周或半周的作业以从历史来源纠正它们。最好有一些缺失的数据,而不是计算不正确的数据。
Messages dropped due to invalid windows
请与我们分享此案的经验。我们知道,从 Dataflow 水印管理的角度来看,如果摄取的数据是稀疏的(超时不够密集),水印会自我调整为当前实时。
我们还认为,我们对 Dataflow 服务维护 PubsubUnboundedSource 的输出时间戳的方式存在误解,因为我们对 Apache Beam 和 Google 的 Dataflow 还很陌生,所以有些事情我们还不知道。
非常感谢!
【问题讨论】:
【参考方案1】:我找到了解决此问题的方法。在我的下沉管道中,与 RFC 3339 标准相比,时间戳属性设置为错误的日期格式。格式化的日期缺少“Z”字符。我们要么修复了“Z”字符,要么更改为使用自纪元以来的毫秒数。两者都运作良好。
但有一点是,当 Dataflow 服务无法解析错误的日期格式时,它会发出警告或抛出错误,而是花费所有元素的处理时间,因此,它们被分配到错误的 event_time 窗口。
【讨论】:
以上是关于使用 DataflowRunner 和 Dataflow 服务运行时,PubsubIO 不会将自定义时间戳属性输出为 context.timestamp的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Beam 在 GCP DataflowRunner 上没有名为“IPython”的模块
DataflowRunner 需要 gcpTempLocation,但无法从 PipelineOptions 检索值
Eclipse 上带有 Dataflow Runner 的 Apache Beam MinimalWordcount 示例
Apache pig 错误 org.apache.pig.backend.hadoop.executionengine.Launcher - 错误:org.apache.avro.file.DataF