收到 PubSub 通知后触发 Dataflow 作业
Posted
技术标签:
【中文标题】收到 PubSub 通知后触发 Dataflow 作业【英文标题】:Trigger Dataflow job upon receiving of a PubSub notification 【发布时间】:2021-11-01 23:19:34 【问题描述】:我用 Apache Beam 编写了一个 Dataflow 管道,让您对代码有一个基本的了解:
Job= (
p
|"cretae">>beam.Create(["message"])
|"job 1" >> beam.ParDo(dofn1())
|"job 2" >> beam.ParDo(dofn2())
|"job 3" >> beam.ParDo(dofn3())
)
目前,我通过创建随机消息来触发数据流管道,消息的内容无关紧要,因为它仅用于触发管道。只是想知道是否有办法在收到 PubSub 通知时触发此管道。也许通过使用 Apache Beam Pubsub API?有人可以举一个这样的例子吗?谢谢
【问题讨论】:
你看过数据流模板了吗? cloud.google.com/dataflow/docs/concepts/dataflow-templates 感谢您的建议纪尧姆,我已经检查过了。虽然数据流模板能够处理流式作业。使用 beam.io.readfrompubsub 会更适合我的用例 【参考方案1】:你是对的。您可以将数据流管道设置为从 GCP 发布/订阅主题中读取。您可以直接从主题中阅读,但我建议创建订阅并将数据流管道与订阅连接(为什么?如果您想重新启动管道并且不会错过任何到达主题的消息,它将防止您丢失消息在停止它和重新启动它之间)。
假设您已经设置了 GPC 发布/订阅主题和订阅,以下是您的操作方法。您需要记住订阅路径。
import apache_beam as beam
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=input_topic_subscription_path)
| "Window into fixed intervals" >> beam.WindowInto(beam.FixedWindows(5))
| "Log the messages" >> beam.Map(lambda message: logging.info(message))
)
上面的代码将每 5 秒从 pub/sub 主题读取消息,然后记录该消息。
【讨论】:
以上是关于收到 PubSub 通知后触发 Dataflow 作业的主要内容,如果未能解决你的问题,请参考以下文章
在 Dataflow Managed Service 中运行时,Dataflow 未读取 PubSub 消息