使用 Pub/Sub 消息触发 Cloud Composer DAG

Posted

技术标签:

【中文标题】使用 Pub/Sub 消息触发 Cloud Composer DAG【英文标题】:Trigger Cloud Composer DAG with a Pub/Sub message 【发布时间】:2020-02-21 08:52:25 【问题描述】:

我正在尝试创建要通过 Pub/Sub 消息触发的 Cloud Composer DAG。 Google 提供了以下示例,每次 Cloud Storage 存储桶发生更改时都会触发 DAG: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf

但是,一开始他们说you can trigger DAGs in response to events, such as a change in a Cloud Storage bucket or a message pushed to Cloud Pub/Sub。我花了很多时间试图弄清楚如何做到这一点,但没有结果。

你能帮忙或给我一些指示吗?谢谢!

【问题讨论】:

【参考方案1】:

有两种方法可以通过 Pub/Sub 事件触发 DAG。

    您可以在 DAG 的开头放置 PubSubPullSensor。每次 PubSubPullSensor 可以拉取 Pub/Sub 消息时,都会触发您的 DAG。它将执行 DAG 中的其余任务。 您还可以创建Cloud Function that acts as Pub/Sub trigger。并将Composer DAG triggering logic 放在 Cloud Function 触发器中。当消息发布到 Pub/Sub 主题时,Cloud Function 应该能够触发 Composer DAG。

【讨论】:

我正在研究第二个选项,但我不知道在 Pub/Sub 订阅中以 Endpoint URL 的形式添加什么。有什么帮助吗? @harry77 Endpoint URL at the Pub/Sub subscription 是什么意思。你在哪一步? 我尝试了第一个选项。将 PubSubPullSensor 放在开头,但在 Pub/Sub 主题上发布新消息后,DAG 无法自动触发。看起来它只是一个集成选项,使您能够阅读发布在 Pub/Sub 主题上的消息。 @BalajeeVenkatesh 如果我正确理解您的问题,解决方案是预先启动包含 PubSubPullSensor 的 DAG。因此,DAG 将始终在那里侦听 Pub/Sub 消息。 PubSubPullSensor 是一个模块,可用于从 Pub/Sub 订阅中读取消息。最终我们创建了一个包含这个模块的“任务”。预先启动 DAG 可以启动“拉取任务”并继续等待 Pub/Sub 主题的新消息。任务在读取所有消息的那一刻结束,并导致 DAG 的下一个任务的执行。这意味着第一个任务不会一直开启,因此 DAG 不会一直在那里收听消息。【参考方案2】:

要扩展您已发布的公共文档页面,您可以配置 Cloud Function 以在每次将消息发布到 Cloud Pub/Sub 主题时运行。 another public documentation page 有更多相关信息。

要将函数附加到主题,请在部署函数时设置--trigger-topic 标志:

gcloud functions deploy $FUNCTION_NAME --runtime $RUNTIME --trigger-topic $TOPIC_NAME

【讨论】:

感谢您的回复

以上是关于使用 Pub/Sub 消息触发 Cloud Composer DAG的主要内容,如果未能解决你的问题,请参考以下文章

从 Google Cloud Run 托管应用程序触发的 Pub/Sub 消息需要很长时间

由 Pub Sub 触发的 Cloud Function 未发布预期消息

使用 Pub/Sub 触发器的 Google Cloud Function 可扩展性限制

如何使用 pub sub 从 Global EventArc 触发 Cloud Run

如何将属性从 Cloud Scheduler 传递到 Pub/Sub?

Google Cloud Pub/Sub - 捕获发送到死信主题的消息的消息传递失败原因[关闭]