将 Pub/Sub 连接到 Dataflow Python 管道
Posted
技术标签:
【中文标题】将 Pub/Sub 连接到 Dataflow Python 管道【英文标题】:Connect Pub/Sub to Dataflow Python pipeline 【发布时间】:2019-07-23 09:48:24 【问题描述】:我正在编写一个处理电子邮件的 Dataflow 流式管道(在 Python 中)。 这个想法是,当一封电子邮件到达时,发布一条 Pub/Sub 消息,触发检索电子邮件并处理它的管道。 Pub/Sub 消息的内容是无用的,因为我只是用它来触发管道。
我在最后一部分遇到了一些麻烦。我设法部署了管道并将其连接到 Pub/Sub 主题,但是当我尝试对其进行测试(发布消息)时,没有任何反应。
我想我必须设置一个窗口来“收集”消息并在某个时候发出它们,但我应该怎么做呢? 有没有办法说“每次收到新的 Pub/Sub 消息时都启动管道,忽略其内容”?
提前致谢!
【问题讨论】:
【参考方案1】:我终于设法解决了我的问题。问题是由于从我为此目的定义的类中导入了自定义管道选项。此导入阻止触发管道。删除它我终于设法触发了管道。
对于那些可能需要它的人来说,有罪的进口是
from engine.user_options import UserOptions
导入的类是
import apache_beam as beam
class UserOptions(beam.options.pipeline_options.PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--env', type=str)
【讨论】:
【参考方案2】:您能否分享有关您的管道以及电子邮件存储位置的更多信息?
我建议您查看 Beam 中可用的一些示例管道。
This pipeline reads from PubSub, modifies and writes to PubSub without windowing。这是最简单的,听起来就是你需要的。 This pipeline reads from PubSub and applies windowing to get a leaderboard如果您分享有关您的管道/代码的更多信息,我可以尝试与您一起迭代。
【讨论】:
以上是关于将 Pub/Sub 连接到 Dataflow Python 管道的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Dataflow 批量(有效)发布到 Pub/Sub?
Dataflow 是不是应该使用来自 Pub/Sub 主题或订阅的事件? [复制]
有没有办法让 Pub/Sub -> Dataflow -> BigQuery 模板处理每条消息的多个记录?
直接流式传输到 BigQuery 与通过 Google Pub/Sub + Dataflow 流式传输的优缺点