使用 jinja 模板中的 Airflow 连接

Posted

技术标签:

【中文标题】使用 jinja 模板中的 Airflow 连接【英文标题】:use Airflow connection from a jinja template 【发布时间】:2021-01-21 11:25:53 【问题描述】:

我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用来自 Jinja 模板的连接。

所以我正在寻找类似于变量的东西

echo  var.value.<variable_name> 

【问题讨论】:

【参考方案1】:

对于 Airflow >= 2.2.0

假设您有 conn id test_conn,您可以通过以下方式直接使用宏:

conn.test_conn 所以你可以得到任何连接属性,比如:

conn.test_conn.host conn.test_conn.login conn.test_conn.password 等等。

对于气流 :

没有现成的宏,但是您可以创建自定义宏来解决这个问题。

连接示例:

创建宏:

def get_host(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.host

def get_schema(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.schema

def get_login(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.login

在 DAG 中使用它们:

def print_function(**context):
    print(f"host=context['host'] schema=context['schema'] login=context['login']")

user_macros = 
    'get_host': get_host,
    'get_schema': get_schema,
    'get_login': get_login,


with DAG(
    dag_id='connection',
    default_args=default_args,
    schedule_interval=None,
    user_defined_macros=user_macros,
) as dag:

# Example how to use as function
python_op = PythonOperator( 
    task_id='python_task',
    provide_context=True,
    python_callable=print_function,
    op_kwargs=
        'host': get_host("test_conn"),
        'schema': get_schema("test_conn"),
        'login': get_login("test_conn"),
    
)

# Example how to use as Jinja string
bash_op = BashOperator( 
    task_id='bash_task',
    bash_command='echo  get_host("test_conn")   get_schema("test_conn")   get_login("test_conn")  ',
)

PythonOperator 的渲染示例:

BashOperator 渲染示例:

一般说明: 这段代码所做的是创建一个自定义函数func() 用作user_defined_macros,从而提供使用它的能力,就像这个宏是由Airflow 本身定义的一样。 您可以通过以下方式访问模板: func() ,如示例中函数允许接受参数所示。

注意您可以为连接对象中的所有字段创建此类函数。

谨慎使用它,将密码作为文本传递可能不是一个好主意。

【讨论】:

【参考方案2】:

我的 PR 添加了 conn.my_conn_id.login 语法,它将在气流 2.2.0 中可用(尚未发布截至 2021 年 9 月 22 日)。

在此处查看templates reference 的未发布文档

对于 2.1.4 及更早版本:

改进以前的答案,

为每个 DAG 定义宏:conn.&lt;conn_id&gt;

您可以使用以下宏获得conn.&lt;connection_name&gt;.host 语法:

class ConnectionGrabber:
    def __getattr__(self, name):
        return  Connection.get_connection_from_secrets(name)
dag = DAG(user_defined_macros='connection': ConnectionGrabber()

将名称 connection 注入到 jinja 模板上下文中,connection 是一个 ConnectionGrabber 实例。此ConnectionGrabber 提供动态/托管属性,因此当您请求属性my_conn_id(如connection.my_conn_id)时,它将执行使用airflow.models.Connection.get_connection_from_secrets 的查找并返回,从那里您可以使用a.m.Connection 属性,如hostloginpassword

在 jinja 模板中使用bash_command='echo connection.mssql.host ' 访问连接mssql 的完整示例:

from airflow.models import DAG,Connection
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago


class ConnectionGrabber:
    def __getattr__(self, name):
        return  Connection.get_connection_from_secrets(name)


args = 'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)

dag = DAG(
    dag_id='test_connection',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    user_defined_macros='connection': ConnectionGrabber()
)

task = BashOperator(task_id='read_connection', bash_command='echo connection.mssql.host ', dag=dag)

在插件中定义宏:macros.conn.value.&lt;conn_id&gt;

如果您想在所有 DAG 中使用此宏,您可以将其包装在插件中,如下所示:

# $AIRFLOW_HOME/plugins/connection_macro.py
from airflow.plugins_manager import AirflowPlugin
from airflow.models import Connection


class ConnectionGrabber:
    __name__ = "value"
    def __str__(self):
        return self.__name__
    def __getattr__(self, name):
        return  Connection.get_connection_from_secrets(name)

class MacrosPlugin(AirflowPlugin):
    name = "conn"
    macros = [ConnectionGrabber()]

检查插件是否可以加载airflow plugins

airflow plugins
name | source                    | macros
=====+===========================+=======
conn | $PLUGINS_FOLDER/macros.py | value

然后你可以像这样在你的神社模板中使用 macros.conn.value.&lt;conn_id&gt;.host

   task = BashOperator(task_id='read_connection', bash_command='echo macros.conn.value.mssql.host = macros.conn.value.mssql.host ', dag=dag)

我还打开了一个issue 以原生添加conn.value.&lt;conn_id&gt; 语法

【讨论】:

以上是关于使用 jinja 模板中的 Airflow 连接的主要内容,如果未能解决你的问题,请参考以下文章

Airflow Jinja 渲染模板

在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用

Airflow Jinja模板无法与自定义操作员一起使用

airflow任务运行抛出jinja2.exceptions.TemplateNotFound

jinja2批量生成python脚本

Airflow 中文文档:使用操作器