airflow使用本地时区

Posted cord

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了airflow使用本地时区相关的知识,希望对你有一定的参考价值。

? 在airflow中使用的时间是utc时间,而更多时候我们希望的是使用本地时间,于是在定义airflow定时任务的时候,涉及到了时间的转换。

1.python中本地时间和utc时间的转换

查看国内可用时区:

>>> import pytz
>>> pytz.country_timezones(‘cn‘)
[‘Asia/Shanghai‘, ‘Asia/Urumqi‘]

方式一:

tz = pytz.timezone(‘Asia/Shanghai‘)
naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
local_dt = tz.localize(naive, is_dst=None)
utc_dt = local_dt.astimezone(pytz.utc)

参考链接: https://stackoverflow.com/questions/79797/how-do-i-convert-local-time-to-utc-in-python

方式二:

tz = pytz.timezone(‘Asia/Shanghai‘)
dt = datetime(2018, 6, 13, 17, 40, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc)

2.Airflow中通过时间转换使用本地时间

这里涉及到一个问题,如果只是将本地时间转换成了utc时间,那么在运行过程中airflow会抛出以下错误:

Can‘t subtract offset-naive and offset-aware datetimes

解决办法是当将时间转换为utc时间之后将其时区属性设为None:

 dt.replace(tzinfo=None)

参考链接: https://stackoverflow.com/questions/796008/cant-subtract-offset-naive-and-offset-aware-datetimes

完整示例DAG如下:

import time
import airflow
import pytz
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import timedelta, datetime

default_args = {
    ‘owner‘: ‘cord‘,
    ‘depends_on_past‘: True,
    ‘email‘: [[email protected]],
    ‘email_on_failure‘: False,
    ‘email_on_retry‘: False,
    ‘retries‘: 1,
    ‘retry_delay‘: timedelta(minutes=5),
}

tz = pytz.timezone(‘Asia/Shanghai‘)
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
# local_dt = tz.localize(naive, is_dst=None)
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)

dt = datetime(2018, 6, 13, 17, 40, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)

dag = DAG(
    ‘demo‘,
    default_args=default_args,
    description=‘my DAG‘,
    schedule_interval=‘* */3 * * *‘,
    start_date=utc_dt
)

def test_func(str):
    print(str)

task = PythonOperator(
    task_id=‘hello‘,
    python_callable=test_func,
    op_kwargs={‘str‘: ‘hello world‘},
    dag=dag
)

以上是关于airflow使用本地时区的主要内容,如果未能解决你的问题,请参考以下文章

Airflow 中文文档:保护连接

入门Airflow 使用Docker在本地快速搭建Airflow

入门Airflow 使用Docker在本地快速搭建Airflow

入门Airflow 使用Docker在本地快速搭建Airflow

(Django)气流中的 ORM - 有可能吗?

Airflow 中文文档:写日志