气流调度程序不断崩溃,数据库连接错误(谷歌作曲家)

Posted

技术标签:

【中文标题】气流调度程序不断崩溃,数据库连接错误(谷歌作曲家)【英文标题】:Airflow Scheduler keeps crashing, DB connection error (Google Composer) 【发布时间】:2018-06-28 09:00:13 【问题描述】:

我使用 Google Composer 已经有一段时间了 (composer-0.5.2-airflow-1.9.0),但在使用 Airflow 调度程序时遇到了一些问题。调度程序容器有时会崩溃,它可能会陷入无法启动任何新任务(数据库连接错误)的锁定状态,因此我必须重新创建整个 Composer 环境。这一次,有一个CrashLoopBackOff,调度程序 pod 无法再重新启动。该错误与我之前遇到的错误非常相似。以下是 Stackdriver 的回溯:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 826, in scheduler
    job.run()
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 198, in run
    self._execute()
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 1549, in _execute
    self._execute_helper(processor_manager)
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 1594, in _execute_helper
    self.reset_state_for_orphaned_tasks(session=session)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/jobs.py", line 266, in reset_state_for_orphaned_tasks
    .filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2783, in all
    return list(self)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2935, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2958, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 948, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
    context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
    exc_info
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 508, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python2.7/site-packages/mysqldb/cursors.py", line 250, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python2.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction') [SQL: u'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid \nFROM task_instance \nWHERE (task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s OR task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s OR task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s OR task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s OR task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s OR task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.execution_date = %s) AND task_instance.state IN (%s, %s) FOR UPDATE'] [parameters: ('pb_write_event_tables_v2_dev2', 'check_table_chest_progressed', datetime.datetime(2018, 6, 26, 8, 0), 'pb_write_event_tables_v2_dev2', 'check_table_name_changed', datetime.datetime(2018, 6, 26, 8, 0), 'pb_write_event_tables_v2_dev2', 'check_table_registered', datetime.datetime(2018, 6, 26, 8, 0), 'pb_write_event_tables_v2_dev2', 'check_table_unit_leveled_up', datetime.datetime(2018, 6, 26, 8, 0), 'pb_write_event_tables_v2_dev2', 'check_table_virtual_currency_earned', datetime.datetime(2018, 6, 26, 8, 0), 'pb_write_event_tables_v2_dev2', 'check_table_virtual_currency_spent', datetime.datetime(2018, 6, 26, 8, 0), u'scheduled', u'queued')] (Background on this error at: http://sqlalche.me/e/e3q8)

技术 RDBMS 错误让我无法理解。但是,这是一个具有默认环境的开箱即用的 Google Composer,所以我想知道是否有其他人遇到过类似的问题或知道发生了什么?我知道 Composer 使用 Google Cloud SQL 作为数据库,显然(?)MySQL 后端。

Airflow Scheduler 图片是gcr.io/cloud-airflow-releaser/airflow-worker-scheduler-1.9.0:cloud_composer_service_2018-06-19-RC3

我必须补充一点,我在使用自制的 Airflow Kubernetes 设置时没有遇到这个调度程序问题,但后来我在 PostgreSQL 中使用了最先进的 Airflow 版本。

【问题讨论】:

我有时在任务中遇到的另一个(但可能不相关的)连接错误是sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (2005, "Unknown MySQL server host 'airflow-sqlproxy-service' (110)") (Background on this error at: http://sqlalche.me/e/e3q8) 当有任务无限期地处于“排队”状态时,Redis 连接会再次出现另一个(但可能不相关)错误。即使任务的状态被清除并且 dag 处于“运行”状态,这些任务也不会重新启动。当尝试从清除状态手动启动它们时,我收到此错误:OperationalError: Error -2 connecting to airflow-redis-service:6379. Name or service not known. 我们有完全相同的错误,Kubernetes 工作负载不断崩溃 附带问题 - 您在哪里看到这些日志? 在 Google Cloud 控制台常规日志中,您可以选择 Composer 环境和/或运行它的 kubernetes 集群/工作负载。 【参考方案1】:

这可能是由于资源过多造成的:

当你加载/更新Python packages 或 当你load DAGs。

为防止这种情况,您可以使用 async DAGs 负载或让环境使用更高的机器类型。

此外,我建议使用最新版本的 composer-1.10.6-airflow-1.10.6,因为问题已得到修复。

【讨论】:

以上是关于气流调度程序不断崩溃,数据库连接错误(谷歌作曲家)的主要内容,如果未能解决你的问题,请参考以下文章

通过 Cloud Composer 运行气流时出现授权错误

是否可以同时进行气流回填和调度?

执行任务后气流调度程序似乎没有运行

fe_sendauth:Composer 中没有提供密码

如何使用apache气流调度谷歌云bigquery存储过程

GCP apache气流,如何从私有存储库安装Python依赖项