使用从 S3 事件调用的 AWS Lambda 触发 Airflow DAG

Posted

技术标签:

【中文标题】使用从 S3 事件调用的 AWS Lambda 触发 Airflow DAG【英文标题】:Triggering Airflow DAG using AWS Lambda called from an S3 event 【发布时间】:2020-02-09 14:32:29 【问题描述】:

我正在考虑构建如下工作流程:

我有一个应用程序将近 1000 个 csv 文件写入 s3 存储桶 MY_BUCKET 中的文件夹 MY_DIRECTORY。现在我想从 s3 存储桶中解析这些文件并使用 Apache Airflow 加载到 mysql 数据库中。

通过阅读这里的几篇文章:Airflow S3KeySensor - How to make it continue running 和 Airflow s3 connection using UI,我认为最好使用 AWS lambda 触发我的 Airflow DAG,一旦文件到达 s3 文件夹就会调用它。

作为 Airflow 和 Lambda 的新手,我不知道如何设置 lambda 来触发 Airflow DAG。在这方面,如果有人请指点一下,那将非常有帮助。谢谢。

【问题讨论】:

【参考方案1】:

创建您要触发的 DAG,然后利用 Airflow 提供的实验性 REST API。

你可以在这里阅读它们:https://airflow.apache.org/docs/stable/api.html

您尤其希望使用以下端点:

POST /api/experimental/dags/<DAG_ID>/dag_runs

您可以在 中传递 DAG 的名称以正确触发它。此外,您可以显式传递 DAG 必须处理的文件的名称

curl -X POST \
  http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
  -H 'Cache-Control: no-cache' \
  -H 'Content-Type: application/json' \
  -d '"conf":"\"FILE_TO_PROCESS\":\"value\""'

然后在 DAG 中使用 Hook 来读取您指定的文件。

【讨论】:

以上是关于使用从 S3 事件调用的 AWS Lambda 触发 Airflow DAG的主要内容,如果未能解决你的问题,请参考以下文章

如何从AWS Lambda检索数据并将其显示在AWS S3托管的静态网站上?

从本地系统而不是S3上运行的node.js应用程序调用AWS Lambda

AWS Lambda 函数写入 S3

AWS Lambda使用S3

AWS Lambda 中的 Amazon S3 waitFor()

Cloudformation 模板在 S3 事件上触发 Lambda