从 pubsub->bigquery 移动到 pubsub->gcs (avro)->bigquery

Posted

技术标签:

【中文标题】从 pubsub->bigquery 移动到 pubsub->gcs (avro)->bigquery【英文标题】:Moving from pubsub->bigquery to pubsub->gcs (avro)->bigquery 【发布时间】:2020-04-20 10:55:54 【问题描述】:

我们当前的数据管道将我们的事件“直接”流式传输到 bigquery。 我们在 pubsub 中有一个消息流,我们首先使用数据流读取、丰富并写入其他 pubsub 主题,然后使用另一个数据流作业读取它,然后写入 bigquery。 它工作正常,但不支持正确的错误处理 - 我们只是丢弃无效消息,而不是处理它们,或者至少将它们保存以备后用。 我们正在考虑改进流程,将无效消息放在一边,并允许稍后对其进行简单修复。 我的第一种方法是将这些有问题的消息写入不同的 pubsub 主题,然后从那里处理它们,但很少有人建议将它们保存到 GCS(可能是 AVRO 文件)中。 问题是:如果我们使用 GCS 和 AVRO,为什么不对所有消息都这样做?与其丰富并写入 pubsub,为什么不丰富并写入 GCS 呢? 如果我们这样做,我们可以使用 AVROIO()watchForNewFiles(),这看起来很简单。 但这听起来太简单了,也太好了。在开始编码之前,我担心几件事:

我知道在数据流上使用 windows 会使流作为批处理数据, 但它比每 X 检查新文件灵活得多 分钟。例如,我将如何处理延迟数据等? 工作无休止地运行,AVRO 文件将被堆放在一个桶中,watchForNewFiles() 假设可以正常工作吗? 它会基于文件时间戳吗?命名格式 ?保持“清单” 已知的旧文件??阅读FileIO代码,看来方法是 很幼稚,这意味着桶越大,存储的时间越长 比赛将进行。

我想念什么吗?这个解决方案不是比 pubsub 更不适合无限流吗?

【问题讨论】:

有什么理由让您拥有 2 条不同的管道,而不是只有一个来完成所有工作?另外,不确定我理解的对不对,写AVRO而不是阅读时不是必须处理迟到的吗? 【参考方案1】: 有a set of APIs控制如何处理迟到的数据 我想如果您使用watchForNewFiles() 轮询单个无限增长的 GCS 存储桶会出现问题。我找不到提到list API 调用的可扩展性的官方文档,但认为它具有 O(n) 复杂性是合理的。如果您想在生产环境中使用管道并订阅 GCP 支持,我建议您与 GCP 支持讨论轮询大型 GCS 存储桶的可扩展性。

【讨论】:

以上是关于从 pubsub->bigquery 移动到 pubsub->gcs (avro)->bigquery的主要内容,如果未能解决你的问题,请参考以下文章

GCP - 从 PubSub 到 BigQuery 的消息

数据流:从 Pubsub RuntimeException 导出到 Bigquery

PubSub 到 BigQuery - Python 中的数据流/Beam 模板?

从 Pubsub 在 BigQuery 中编写查询

在 Java 中将 protobuf 转换为 bigquery

如何将 JSON 数据发布到 PubSub,然后推送到 BigQuery?