Airflow dag 中的 postgres_operator 问题

Posted

技术标签:

【中文标题】Airflow dag 中的 postgres_operator 问题【英文标题】:Issues with postgres_operator in Airflow dag 【发布时间】:2017-09-25 02:40:38 【问题描述】:

我目前正在使用 Airflow 1.8.2 来安排一些 EMR 任务,然后在我们的 Redshift 集群上执行一些长时间运行的查询。为此,我正在使用 postgres_operator。查询运行大约需要 30 分钟。但是,一旦完成,连接就永远不会关闭,并且操作员会再运行一个半小时,直到每次在 2 小时标记处终止。终止消息是服务器意外关闭了连接。

我检查了 Redshift 端的日志,它显示查询已运行并且连接已关闭。不知何故,这从未传回给 Airflow。关于我可以检查的更多内容的任何指示都会有所帮助。为了提供更多信息,我的 Airflow 安装是 https://github.com/puckel/docker-airflow docker 映像的扩展,在 ECS 集群中运行并使用 SQLite 作为后端,因为我仍在测试 Airflow。另外,我使用sequential executor 作为后端。我将不胜感激在这件事上的任何帮助。

【问题讨论】:

【参考方案1】:

我们之前遇到过类似的问题,但我使用 SQLAlchemy 到 Redshift,如果您使用的是 postgres_operator,它应该非常相似。如果长时间运行的查询没有看到任何活动,Redshift 似乎会关闭连接,在您的情况下,30 分钟是相当长的查询。 检查https://www.postgresql.org/docs/9.5/static/runtime-config-connection.html 您有三个设置,tcp_keepalives_idletcp_keepalives_idletcp_keepalives_count,它们会向 redshift 发送实时消息以指示“嘿,我还活着。 您可以将以下内容作为参数传递,如下所示:connect_args='keepalives': 1, 'keepalives_idle':60, 'keepalives_interval': 60

【讨论】:

感谢您的回答。我最终定义了一个自定义运算符和挂钩,让我可以覆盖 keepalives 参数。

以上是关于Airflow dag 中的 postgres_operator 问题的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?

airflow数据表结构了解一下

如何限制 Airflow 一次只运行一个 DAG 实例?

如何删除气流中的默认示例 dag

气流未在 /usr/local/airflow/dags 中加载 dag

如何向 Airflow 添加新的 DAG?