如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取
Posted
技术标签:
【中文标题】如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取【英文标题】:How to Crash/Stop DataFlow Pub/Sub Ingestion on BigQuery Insert Error 【发布时间】:2017-11-11 17:57:10 【问题描述】:我正在寻找一种方法,让 Google DataFlow 作业在发生(特定)异常时停止从 Pub/Sub 摄取。
来自 Pub/Sub 的事件是通过 PubsubIO.Read.Bound<TableRow>
使用 TableRowJsonCoder
读取的 JSON,并使用 TableRowJsonCoder
直接流式传输到 BigQuery
BigQueryIO.Write.Bound
。
(中间有一个ParDo
,它会更改一个字段的内容,并按天发生一些自定义分区,但这与此目的无关。)
当从 PubSub 提取的事件/行中的字段不是目标 BigQuery 表中的列时,DataFlow 作业会在运行时记录 IOExceptions,声称它无法插入行,但似乎确认这些消息并继续运行.
我想要做的是停止从 Pub/Sub 提取消息和/或使 Dataflow 作业崩溃,以便警报可以基于最旧的未确认消息的年龄。至少我想确保那些未能插入 BigQuery 的 Pub/Sub 消息未被确认,以便我可以解决问题,重新启动 Dataflow 作业并再次使用这些消息。
我知道这里描述了一种处理错误输入的建议解决方案:https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow
我也知道 Apache Beam 上的这个 PR,它允许在没有违规字段的情况下插入行: https://github.com/apache/beam/pull/1778
但是在我的情况下,我真的不想防止错误输入,而是防止程序员错误,即新字段被添加到推送到 Pub/Sub 的 JSON 消息中,但相应的 DataFlow 作业是未更新。所以我并没有真正有错误的数据,我只是想在程序员犯了一个错误而不是在更改消息格式之前没有部署新的 Dataflow 作业时崩溃。
我认为可以(类似于博客文章解决方案)创建一个自定义 ParDo
来验证每一行并引发未被捕获并导致崩溃的异常。
但理想情况下,我希望有一些配置不处理插入错误并记录它,而只是使作业崩溃或至少停止摄取。
【问题讨论】:
“所以我并没有真正的数据错误,我只是想在程序员犯了一个错误而不是在更改有关消息格式的任何内容之前部署新的 Dataflow 作业时崩溃。” - 退后一步,当有人进行更改时,为什么不简单地强制更新部署管道中的 Dataflow 管道?我假设您正在进行某种集成/单元测试? 不适合管道,不。它相当纤薄,或多或少只使用搁板组件。在合成数据上测试它会像初始设置一样容易忘记更改,只是开销更大,不是吗? 【参考方案1】:您可以有一个带有 DoFn 的 ParDo,它位于 BQ 写入之前。 DoFn 将负责每 X 分钟获取一次输出表模式,并验证要写入的每条记录是否与预期的输出模式匹配(如果不匹配则抛出异常)。
Old Pipeline:
PubSub -> Some Transforms -> BQ Sink
New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink
这样做的好处是,一旦有人修复了输出表架构,管道就会恢复。您需要抛出一个很好的错误消息,说明传入的 PubSub 消息有什么问题。
或者,您可以让BQ Sink Validator
将消息输出到 PubSub DLQ(监控其大小)。在操作上,您必须更新表,然后重新摄取 DLQ 作为输入。这样做的好处是只有错误消息会阻塞管道执行。
【讨论】:
这就是我的意思“我认为有可能(类似于博客文章解决方案)创建一个自定义 ParDo 来验证每一行并抛出一个未被捕获的异常并导致崩溃。”即使 BigQuery 插入失败,消息也会得到确认,这让我感到困惑(如果我理解正确,这意味着消息根据this 保留在管道中的某处)。我想知道是否可以通过适当的配置来避免这种情况,无论是通过不确认还是通过崩溃。 Dataflow 流式传输管道将永远重试(目前无法配置为执行其他任何操作),它们目前不会根据失败自动取消/失败。取消管道需要用户显式调用 Dataflow(从管道外部或管道内部,例如来自 BQ Sink Validator)。 Dataflow 将管道分成许多段,因此消息在进入管道后立即被 ACK,因为这些段中的每一个都是独立的,因此通过以下方式跟踪每个源 PubSub 消息并不高效管道并在出现故障时自动将消息 NACK。这通常是不可行的,因为用户为单个输入生成多个输出(多输出 DoFn),并且多个输入可能产生单个输出(组合器)。 所以BQ Sink Validator
不会阻止管道确认消息,它只会重新尝试永远验证消息,因此不会丢失它。相比之下,BQ Sink 不会重试,而只是丢弃消息。那是对的吗?但这也意味着在这种情况下我不能排干管道?我可以从异常处理代码中取消管道吗?
管道仍然会确认消息,但消息不会丢失,因为它们会卡在重试循环中而不会丢失。据我所知,BQ 接收器不会丢弃消息,并且也会无限期地重试。是的,您可以在“卡住”时取消管道。不,您不能排空管道,因为它会不断失败。您始终可以创建一个死信队列,向其中发送错误消息,而不是从管道内抛出异常以保持管道运行,从而成功排出。以上是关于如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 bigrquery 库将非重复记录插入 BigQuery?