使用 Pub/Sub 消息触发 Cloud Composer DAG
Posted
技术标签:
【中文标题】使用 Pub/Sub 消息触发 Cloud Composer DAG【英文标题】:Trigger Cloud Composer DAG with a Pub/Sub message 【发布时间】:2020-02-21 08:52:25 【问题描述】:我正在尝试创建要通过 Pub/Sub 消息触发的 Cloud Composer DAG。 Google 提供了以下示例,每次 Cloud Storage 存储桶发生更改时都会触发 DAG: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
但是,一开始他们说you can trigger DAGs in response to events, such as a change in a Cloud Storage bucket or a message pushed to Cloud Pub/Sub
。我花了很多时间试图弄清楚如何做到这一点,但没有结果。
你能帮忙或给我一些指示吗?谢谢!
【问题讨论】:
【参考方案1】:有两种方法可以通过 Pub/Sub 事件触发 DAG。
-
您可以在 DAG 的开头放置 PubSubPullSensor。每次 PubSubPullSensor 可以拉取 Pub/Sub 消息时,都会触发您的 DAG。它将执行 DAG 中的其余任务。
您还可以创建Cloud Function that acts as Pub/Sub trigger。并将Composer DAG triggering logic 放在 Cloud Function 触发器中。当消息发布到 Pub/Sub 主题时,Cloud Function 应该能够触发 Composer DAG。
【讨论】:
我正在研究第二个选项,但我不知道在 Pub/Sub 订阅中以Endpoint URL
的形式添加什么。有什么帮助吗?
@harry77 Endpoint URL at the Pub/Sub subscription
是什么意思。你在哪一步?
我尝试了第一个选项。将 PubSubPullSensor 放在开头,但在 Pub/Sub 主题上发布新消息后,DAG 无法自动触发。看起来它只是一个集成选项,使您能够阅读发布在 Pub/Sub 主题上的消息。
@BalajeeVenkatesh 如果我正确理解您的问题,解决方案是预先启动包含 PubSubPullSensor 的 DAG。因此,DAG 将始终在那里侦听 Pub/Sub 消息。
PubSubPullSensor 是一个模块,可用于从 Pub/Sub 订阅中读取消息。最终我们创建了一个包含这个模块的“任务”。预先启动 DAG 可以启动“拉取任务”并继续等待 Pub/Sub 主题的新消息。任务在读取所有消息的那一刻结束,并导致 DAG 的下一个任务的执行。这意味着第一个任务不会一直开启,因此 DAG 不会一直在那里收听消息。【参考方案2】:
要扩展您已发布的公共文档页面,您可以配置 Cloud Function 以在每次将消息发布到 Cloud Pub/Sub 主题时运行。 another public documentation page 有更多相关信息。
要将函数附加到主题,请在部署函数时设置--trigger-topic
标志:
gcloud functions deploy $FUNCTION_NAME --runtime $RUNTIME --trigger-topic $TOPIC_NAME
【讨论】:
感谢您的回复以上是关于使用 Pub/Sub 消息触发 Cloud Composer DAG的主要内容,如果未能解决你的问题,请参考以下文章
从 Google Cloud Run 托管应用程序触发的 Pub/Sub 消息需要很长时间
由 Pub Sub 触发的 Cloud Function 未发布预期消息
使用 Pub/Sub 触发器的 Google Cloud Function 可扩展性限制
如何使用 pub sub 从 Global EventArc 触发 Cloud Run