收到 PubSub 通知后触发 Dataflow 作业

Posted

技术标签:

【中文标题】收到 PubSub 通知后触发 Dataflow 作业【英文标题】:Trigger Dataflow job upon receiving of a PubSub notification 【发布时间】:2021-11-01 23:19:34 【问题描述】:

我用 Apache Beam 编写了一个 Dataflow 管道,让您对代码有一个基本的了解:

Job= (
    p
    |"cretae">>beam.Create(["message"])
    |"job 1" >> beam.ParDo(dofn1())
    |"job 2" >> beam.ParDo(dofn2())
    |"job 3" >> beam.ParDo(dofn3())
    )

目前,我通过创建随机消息来触发数据流管道,消息的内容无关紧要,因为它仅用于触发管道。只是想知道是否有办法在收到 PubSub 通知时触发此管道。也许通过使用 Apache Beam Pubsub API?有人可以举一个这样的例子吗?谢谢

【问题讨论】:

你看过数据流模板了吗? cloud.google.com/dataflow/docs/concepts/dataflow-templates 感谢您的建议纪尧姆,我已经检查过了。虽然数据流模板能够处理流式作业。使用 beam.io.readfrompubsub 会更适合我的用例 【参考方案1】:

你是对的。您可以将数据流管道设置为从 GCP 发布/订阅主题中读取。您可以直接从主题中阅读,但我建议创建订阅并将数据流管道与订阅连接(为什么?如果您想重新启动管道并且不会错过任何到达主题的消息,它将防止您丢失消息在停止它和重新启动它之间)。

假设您已经设置了 GPC 发布/订阅主题和订阅,以下是您的操作方法。您需要记住订阅路径。

import apache_beam as beam
import logging

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

with beam.Pipeline(options=pipeline_options) as pipeline:
    (pipeline
    | "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=input_topic_subscription_path)
    | "Window into fixed intervals" >> beam.WindowInto(beam.FixedWindows(5))
    | "Log the messages" >> beam.Map(lambda message: logging.info(message))
    )

上面的代码将每 5 秒从 pub/sub 主题读取消息,然后记录该消息。

【讨论】:

以上是关于收到 PubSub 通知后触发 Dataflow 作业的主要内容,如果未能解决你的问题,请参考以下文章

GCP Pubsub 中的消息丢失和重复

在 Dataflow Managed Service 中运行时,Dataflow 未读取 PubSub 消息

本地 Pubsub 模拟器不适用于 Dataflow

TPL Dataflow,数据块收到第一项时的通知

xmpp pubsub service(XEP-0060) 收到订阅请求后能否创建节点?

在 Dataflow Python 中从 PubSub 读取 AVRO 消息