使用 Apache Airflow 更新和维护 postgres 表

Posted

技术标签:

【中文标题】使用 Apache Airflow 更新和维护 postgres 表【英文标题】:Upserting and maintaing postgres table using Apache Airflow 【发布时间】:2022-01-21 17:11:04 【问题描述】:

处理需要我从一个 postgres 表中提取数据并将数据更新到另一个环境中的另一个 Postgres 表的 ETL 过程(相同的列名)。目前,我正在 Windows EC2 实例中运行 python 作业,并且我正在使用 pangres upsert 库来更新现有行并插入新行。

但是,我的组织希望我在 AWS 上的托管 Apache Airflow 中移动 python ETL 脚本。

我一直在学习 DAG,大部分教程和文章都是关于使用钩子或运算符从 postgres 表中查询数据。

但是,我希望了解如何使用表 B 中的新记录(并忽略/覆盖现有匹配行)以增量方式(即 upsert)更新现有表 A。

任何解释如何执行这个简单任务的代码块 (DAG) 都会非常有帮助。

【问题讨论】:

【参考方案1】:

在 Apache Airflow 中,操作是使用运算符完成的。您可以将任何 Python 代码打包到运算符中,但最好的办法是始终使用预先存在的开源运算符(如果已经存在)。 Postgres 有一个运算符 (https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html)。

很难提供一个完整的例子来说明你应该为你的情况编写什么,但听起来你在这里采取的最佳方法是获取 Python ETL 脚本中存在的任何 SQL 并将其与Postgres 运算符。我链接的文档应该是一个很好的例子。

他们演示了插入数据、读取数据,甚至创建表作为先决条件。就像在 Python 脚本中一次执行一个行一样,在 DAG 中,运算符按照特定的顺序执行,具体取决于它们的连接方式,就像在他们的示例中一样:

create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date

在他们的示例中,在创建宠物表步骤成功之前不会填充宠物表,等等。

由于您的用例是将新数据从一个表复制到另一个表,因此我可以给您一些提示:

使用计划的 DAG 批量复制数据。 Airflow 不适合用于处理许多小数据的流式传输系统。 在 DAG 中使用 DAG 运行 (https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html) 的“逻辑日期”来了解运行应处理的数据间隔。这非常适合您的要求,即在每次运行期间只应复制新数据。如果您需要修复代码,它还会为您提供可重复的运行,然后在推送修复后重新运行每次运行(一次一批)。

【讨论】:

您提到的文章很棒,但是它没有给出更新插入新记录的具体示例。我看到有一个在其中插入行的示例,但这本身就是我正在寻找的。我正在寻找的最终解决方案是理想情况下 DAG 从表 A 中提取过去三个月的数据并将它们附加到表 B 中而不会重复。 编辑了我的答案,为您的用例提供一些提示。 实际上,我想我误解了你想要做什么。当您使用“upsert”一词时,听起来您的目标是至少处理一次数据。您希望在创建它时对其进行处理(通过将其从表 A 复制到表 B),然后在表 A 中更新它时处理它(通过再次将其复制到表 B,如果存在主键,则用其主键覆盖任何行) .您能否在问题中添加更多细节以阐明您的目标是什么?在某些时间点,您希望表 A 和 B 中的哪些数据以及示例时间表会有所帮助。 这个工作流示例下面的链接很好地解释了用例。 github.com/ThibTrip/pangres/wiki/Upsert Upsert 是您可以对表执行的操作。如果您需要知道如何对表进行 Postgres 操作,那么您只需阅读操作符即可。您将使用纯 Postgres SQL(而不是 pangres 库),并且 Postgres 运算符支持所有 Postgres SQL。您只需要将 pangres 正在执行的操作转换为 Postgres SQL。如果您正在谈论具有多个表的某种模式,则必须扩展您尝试对这些表执行的操作。有很多方法可以将数据从一个复制到另一个。

以上是关于使用 Apache Airflow 更新和维护 postgres 表的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache 气流存储和访问密码

Apache Atlas 和 Airflow 集成

安装 Apache Airflow 后出错

Apache Airflow 徽标路径

Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据

Apache Airflow - 添加 Google 身份验证