将 Pub/Sub 连接到 Dataflow Python 管道

Posted

技术标签:

【中文标题】将 Pub/Sub 连接到 Dataflow Python 管道【英文标题】:Connect Pub/Sub to Dataflow Python pipeline 【发布时间】:2019-07-23 09:48:24 【问题描述】:

我正在编写一个处理电子邮件的 Dataflow 流式管道(在 Python 中)。 这个想法是,当一封电子邮件到达时,发布一条 Pub/Sub 消息,触发检索电子邮件并处理它的管道。 Pub/Sub 消息的内容是无用的,因为我只是用它来触发管道。

我在最后一部分遇到了一些麻烦。我设法部署了管道并将其连接到 Pub/Sub 主题,但是当我尝试对其进行测试(发布消息)时,没有任何反应。

我想我必须设置一个窗口来“收集”消息并在某个时候发出它们,但我应该怎么做呢? 有没有办法说“每次收到新的 Pub/Sub 消息时都启动管道,忽略其内容”

提前致谢!

【问题讨论】:

【参考方案1】:

我终于设法解决了我的问题。问题是由于从我为此目的定义的类中导入了自定义管道选项。此导入阻止触发管道。删除它我终于设法触发了管道。

对于那些可能需要它的人来说,有罪的进口是

from engine.user_options import UserOptions

导入的类是

import apache_beam as beam


class UserOptions(beam.options.pipeline_options.PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--env', type=str)

【讨论】:

【参考方案2】:

您能否分享有关您的管道以及电子邮件存储位置的更多信息?

我建议您查看 Beam 中可用的一些示例管道。

This pipeline reads from PubSub, modifies and writes to PubSub without windowing。这是最简单的,听起来就是你需要的。 This pipeline reads from PubSub and applies windowing to get a leaderboard

如果您分享有关您的管道/代码的更多信息,我可以尝试与您一起迭代。

【讨论】:

以上是关于将 Pub/Sub 连接到 Dataflow Python 管道的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Dataflow 批量(有效)发布到 Pub/Sub?

Dataflow 是不是应该使用来自 Pub/Sub 主题或订阅的事件? [复制]

有没有办法让 Pub/Sub -> Dataflow -> BigQuery 模板处理每条消息的多个记录?

直接流式传输到 BigQuery 与通过 Google Pub/Sub + Dataflow 流式传输的优缺点

如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub