BigQueryExecuteQueryOperator 超时问题(如何增加超时)

Posted

技术标签:

【中文标题】BigQueryExecuteQueryOperator 超时问题(如何增加超时)【英文标题】:BigQueryExecuteQueryOperator timeout issue (how to increase timeout) 【发布时间】:2021-12-06 19:10:14 【问题描述】:

在 Airflow 中遇到问题。简而言之: 我试图从 1.10.14 版本移动到 2.2.2 版本。但是,当我想将 BigQueryOperator 替换为 BigQueryExecuteQueryOperator 我试图增加 jobTimeoutMs 如何告诉 Bigquery API 文档,但仍然看到这个问题。

当作业运行时间超过 1 分钟时,我发现此问题 同时,在 Airflow 1.10.14 上具有相同配置的谷歌云连接的旧 Bigquery 运算符中,我还没有看到这种问题

    mobile_push_stat = BigQueryExecuteQueryOperator(
        task_id="mobile_push_stat",
        sql="/sql/updater/mobile_push_stat.sql",
        use_legacy_sql=False,
        api_resource_configs="jobTimeoutMs": "3600000",
        gcp_conn_id="bigquery_work",
    )

日志

*** Reading local file: /srv/airflow/logs/ExtensionPushStat/mobile_push_stat/2021-12-05T06:00:00+00:00/29.log
[2021-12-06, 17:33:11 UTC] taskinstance.py:1035 INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] taskinstance.py:1035 INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] taskinstance.py:1241 INFO - 
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] taskinstance.py:1242 INFO - Starting attempt 29 of 29
[2021-12-06, 17:33:11 UTC] taskinstance.py:1243 INFO - 
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] taskinstance.py:1262 INFO - Executing <Task(BigQueryExecuteQueryOperator): mobile_push_stat> on 2021-12-05 06:00:00+00:00
[2021-12-06, 17:33:11 UTC] base_task_runner.py:141 INFO - Running on host: airflow-vm-v2
[2021-12-06, 17:33:11 UTC] base_task_runner.py:142 INFO - Running: ['airflow', 'tasks', 'run', 'ExtensionPushStat', 'mobile_push_stat', 'scheduled__2021-12-05T06:00:00+00:00', '--job-id', '3235', '--raw', '--subdir', 'DAGS_FOLDER/ExtensionPushStat.py', '--cfg-path', '/tmp/tmpk6p4qql0', '--error-file', '/tmp/tmpwz3z5o0y']
[2021-12-06, 17:33:12 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat [[34m2021-12-06, 17:33:12 UTC[0m] [34mdagbag.py:[0m500 INFO[0m - Filling up the DagBag from /srv/airflow/dags/ExtensionPushStat.py[0m
[2021-12-06, 17:34:11 UTC] local_task_job.py:206 WARNING - Recorded pid 137796 does not match the current pid 137817
[2021-12-06, 17:34:11 UTC] process_utils.py:100 INFO - Sending Signals.SIGTERM to GPID 137817
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat Running <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [running]> on host airflow-vm-v2
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat Traceback (most recent call last):
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/bin/airflow", line 8, in <module>
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     sys.exit(main())
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     args.func(args)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     return func(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     return f(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     _run_task_by_selected_method(args, dag, ti)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     _run_raw_task(args, ti)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     ti._run_raw_task(
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     return func(*args, session=session, **kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     self._execute_task_with_callbacks(context)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     result = self._execute_task(context, self.task)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     result = execute_callable(context=context)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/bigquery.py", line 693, in execute
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     job_id = self.hook.run_query(
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2325, in run_query
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     job = self.insert_job(configuration=configuration, project_id=self.project_id)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrapper
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     return func(self, *args, **kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1639, in insert_job
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     job.result()
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1450, in result
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     do_get_result()
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1440, in do_get_result
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     super(QueryJob, self).result(retry=retry, timeout=timeout)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 727, in result
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 130, in result
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     self._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1199, in _blocking_poll
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 108, in _blocking_poll
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     retry_(self._done_or_raise)(**kwargs)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     return retry_target(
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 220, in retry_target
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     time.sleep(sleep)
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1413, in signal_handler
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat     raise AirflowException("Task received SIGTERM signal")
[2021-12-06, 17:34:11 UTC] base_task_runner.py:122 INFO - Job 3235: Subtask mobile_push_stat airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-12-06, 17:34:11 UTC] process_utils.py:66 INFO - Process psutil.Process(pid=137817, status='terminated', exitcode=1, started='17:33:11') (137817) terminated with exit code 1

更新

这是与 Bigquery 或 Google 提供商无关的问题。

这个问题与两个配置参数有关

killed_task_cleanup_time - 在这种情况下,需要增加长时间运行的作业 job_heartbeat_sec - 在这种情况下,当您想要清除失败任务的状态并从 UI 重新运行时

【问题讨论】:

【参考方案1】:

BigQueryExecuteQueryOperator 已弃用(请参阅source code)。你应该使用BigQueryInsertJobOperator

基于api 参数和运算符source code,语法应该是:

mobile_push_stat = BigQueryInsertJobOperator(
    task_id="mobile_push_stat",
    gcp_conn_id="bigquery_work",
    configuration=
        "query": "/sql/updater/mobile_push_stat.sql",
        "useLegacySql": False,
        "timeoutMs": 3600000,

    ,
)

编辑:注意timeoutMs 周围的限制。您选择的3600000 的值可能太高了。

您可以在 timeoutMs 字段中请求更长的超时时间。 但是,调用并不能保证等待指定的超时时间; 它通常在大约 200 秒(200,000 毫秒)后返回, 即使查询未完成。

【讨论】:

感谢您的回复@Elad,但这也不起作用 你能说得更具体点吗? 当然。 pastebin.com/m5smUQNP 这看起来像是 google lib 本身的问题。您是否使用 Airflow 允许的最新稳定版本的库?如果你删除 timeoutMs 会发生什么? 这是我编辑的任务 `mobile_push_stat = BigQueryInsertJobOperator( task_id="mobile_push_stat", configuration= "query": "query": "% include '/sql/updater/mobile_push_stat.sql' %", "useLegacySql": False, "jobTimeoutMs": "3600000" , location="US", gcp_conn_id="bigquery_work", )` 气流 2.2.2 apache-airflow-providers-google==6.1.0

以上是关于BigQueryExecuteQueryOperator 超时问题(如何增加超时)的主要内容,如果未能解决你的问题,请参考以下文章