Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢
Posted
技术标签:
【中文标题】Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢【英文标题】:Apache Airflow - Slow to parse SQL queries on AWS MWAA 【发布时间】:2021-04-10 05:04:57 【问题描述】:我正在尝试在 AWS MWAA 上构建 DAG,此 DAG 会将数据从 Postgres (RDS) 导出到 S3,但是一旦 MWAA 尝试解析我的任务中的所有查询,它就会出现问题,它总共会导出385 个表,但 DAG 卡在运行模式,无法启动我的任务。
基本上,这个过程将:
-
加载表架构
重命名某些列
将数据导出到 S3
功能
def export_to_s3(dag, conn, db, pg_hook, export_date, s3_bucket, schemas):
tasks = []
run_queries = []
for schema, features in schemas.items():
t = features.get("tables")
if t:
tables = t
else:
tables = helper.get_tables(pg_hook, schema).table_name.tolist()
is_full_export = features.get("full")
for table in tables:
columns = helper.get_table_schema(
pg_hook, table, schema
).column_name.tolist()
masked_columns = helper.masking_pii(columns, pii_columns=PII_COLS)
masked_columns_str = ",\n".join(masked_columns)
if is_full_export:
statement = f'select masked_columns_str from db.schema."table"'
else:
statement = f'select masked_columns_str from db.schema."table" order by random() limit 10000'
s3_bucket_key = export_date + "_" + schema + "_" + table + ".csv"
sql_export = f"""
SELECT * from aws_s3.query_export_to_s3(
'statement',
aws_commons.create_s3_uri(
's3_bucket',
's3_bucket_key',
'ap-southeast-2'),
options := 'FORMAT csv, DELIMITER $$|$$'
)""".strip()
run_queries.append(sql_export)
def get_table_schema(pg_hook, table_name, table_schema):
""" Gets the schema details of a given table in a given schema."""
query = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = '0'
AND table_name = '1'
order by ordinal_position
""".format(table_schema, table_name)
df_schema = pg_hook.get_pandas_df(query)
return df_schema
def get_tables(pg_hook, schema):
query = """
select table_name from information_schema.tables
where table_schema = '' and table_type = 'BASE TABLE' and table_name != '_sdc_rejected' """.format(schema)
df_schema = pg_hook.get_pandas_df(query)
return df_schema
任务
task = PostgresOperator(
sql=run_queries,
postgres_conn_id=conn,
task_id="export_to_s3",
dag=dag,
autocommit=True,
)
tasks.append(task)
return tasks
气流 list_dags 输出
DAGS
-------------------------------------------------------------------
mydag
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 1
Total task number: 3
DagBag parsing time: 159.94030800000002
-----------------------------------------------------+--------------------+---------+----------
file | duration | dag_num | task_num
-----------------------------------------------------+--------------------+---------+----------
/mydag.py | 159.05215199999998 | 1 | 3
/ActivationPriorityCallList/CallList_Generator.py | 0.878734 | 0 | 0
/ActivationPriorityCallList/CallList_Preprocessor.py | 0.00744 | 0 | 0
/ActivationPriorityCallList/CallList_Emailer.py | 0.001154 | 0 | 0
/airflow_helperfunctions.py | 0.000828 | 0 | 0
-----------------------------------------------------+--------------------+---------+----------
观察
如果我在任务中只启用一个表加载,它运行良好,但如果所有表都启用加载则失败。 如果从指向 RDS 的 docker 执行 Airflow,则此行为是相同的
气流 list_dags 的屏幕截图:
【问题讨论】:
【参考方案1】:当我在 MWAA 上更改这些值时,问题就解决了。
webserver.web_server_master_timeout webserver.web_server_worker_timeout默认值是30,我改成480了。
与文档的链接。
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html【讨论】:
以上是关于Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Airflow Scheduler 部署到 AWS EC2? [关闭]
AWS EMR Airflow:Postgresql 连接器