Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢

Posted

技术标签:

【中文标题】Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢【英文标题】:Apache Airflow - Slow to parse SQL queries on AWS MWAA 【发布时间】:2021-04-10 05:04:57 【问题描述】:

我正在尝试在 AWS MWAA 上构建 DAG,此 DAG 会将数据从 Postgres (RDS) 导出到 S3,但是一旦 MWAA 尝试解析我的任务中的所有查询,它就会出现问题,它总共会导出385 个表,但 DAG 卡在运行模式,无法启动我的任务。

基本上,这个过程将:

    加载表架构 重命名某些列 将数据导出到 S3

功能

def export_to_s3(dag, conn, db, pg_hook, export_date, s3_bucket, schemas):

    tasks = []
    run_queries = []
    
    for schema, features in schemas.items():
        t = features.get("tables")
        if t:
            tables = t
        else:
            tables = helper.get_tables(pg_hook, schema).table_name.tolist()

        is_full_export = features.get("full")

        for table in tables:
            columns = helper.get_table_schema(
                pg_hook, table, schema
            ).column_name.tolist()
            masked_columns = helper.masking_pii(columns, pii_columns=PII_COLS)
            masked_columns_str = ",\n".join(masked_columns)

            if is_full_export:
                statement = f'select masked_columns_str from db.schema."table"'
            else:
                statement = f'select masked_columns_str from db.schema."table" order by random() limit 10000'
            s3_bucket_key = export_date + "_" + schema + "_" + table + ".csv"
            sql_export = f"""
            SELECT * from aws_s3.query_export_to_s3(
                'statement',
                    aws_commons.create_s3_uri(
                        's3_bucket',
                        's3_bucket_key',
                        'ap-southeast-2'),
                        options := 'FORMAT csv, DELIMITER $$|$$'
            )""".strip()
            run_queries.append(sql_export)



   def get_table_schema(pg_hook, table_name, table_schema):
        """ Gets the schema details of a given table in a given schema."""
        query = """
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_schema = '0'
          AND table_name = '1'
        order by ordinal_position
        """.format(table_schema, table_name)
    
        df_schema = pg_hook.get_pandas_df(query)
        return df_schema
    
    
    def get_tables(pg_hook, schema):
        query = """
        select table_name from information_schema.tables
        where table_schema = '' and table_type = 'BASE TABLE' and table_name != '_sdc_rejected' """.format(schema)
    
        df_schema = pg_hook.get_pandas_df(query)
        return df_schema

任务

 task = PostgresOperator(
        sql=run_queries,
        postgres_conn_id=conn,
        task_id="export_to_s3",
        dag=dag,
        autocommit=True,
    )

    tasks.append(task)

    return tasks

气流 list_dags 输出

DAGS
-------------------------------------------------------------------
mydag
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 1
Total task number: 3
DagBag parsing time: 159.94030800000002
-----------------------------------------------------+--------------------+---------+----------
file                                                 | duration           | dag_num | task_num 
-----------------------------------------------------+--------------------+---------+----------
/mydag.py                                            | 159.05215199999998 |       1 |        3 
/ActivationPriorityCallList/CallList_Generator.py    | 0.878734           |       0 |        0 
/ActivationPriorityCallList/CallList_Preprocessor.py | 0.00744            |       0 |        0 
/ActivationPriorityCallList/CallList_Emailer.py      | 0.001154           |       0 |        0 
/airflow_helperfunctions.py                          | 0.000828           |       0 |        0 
-----------------------------------------------------+--------------------+---------+----------

观察

如果我在任务中只启用一个表加载,它运行良好,但如果所有表都启用加载则失败。 如果从指向 RDS 的 docker 执行 Airflow,则此行为是相同的

气流 list_dags 的屏幕截图:

【问题讨论】:

【参考方案1】:

当我在 MWAA 上更改这些值时,问题就解决了。

webserver.web_server_master_timeout webserver.web_server_worker_timeout

默认值是30,我改成480了。

与文档的链接。

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html

【讨论】:

以上是关于Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Airflow Scheduler 部署到 AWS EC2? [关闭]

AWS EMR Airflow:Postgresql 连接器

AWS Snowflake 连接中的 MWAA Airflow 2.0 未显示

在Conda环境中安装Apache-Airflow

Apache Atlas 和 Airflow 集成

任务调度工具 Apache Airflow 初识