使用 jinja 模板中的 Airflow 连接
Posted
技术标签:
【中文标题】使用 jinja 模板中的 Airflow 连接【英文标题】:use Airflow connection from a jinja template 【发布时间】:2021-01-21 11:25:53 【问题描述】:我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用来自 Jinja 模板的连接。
所以我正在寻找类似于变量的东西
echo var.value.<variable_name>
【问题讨论】:
【参考方案1】:对于 Airflow >= 2.2.0:
假设您有 conn id test_conn
,您可以通过以下方式直接使用宏:
conn.test_conn
所以你可以得到任何连接属性,比如:
conn.test_conn.host
、 conn.test_conn.login
、 conn.test_conn.password
等等。
对于气流 :
没有现成的宏,但是您可以创建自定义宏来解决这个问题。
连接示例:
创建宏:
def get_host(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.host
def get_schema(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.schema
def get_login(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.login
在 DAG 中使用它们:
def print_function(**context):
print(f"host=context['host'] schema=context['schema'] login=context['login']")
user_macros =
'get_host': get_host,
'get_schema': get_schema,
'get_login': get_login,
with DAG(
dag_id='connection',
default_args=default_args,
schedule_interval=None,
user_defined_macros=user_macros,
) as dag:
# Example how to use as function
python_op = PythonOperator(
task_id='python_task',
provide_context=True,
python_callable=print_function,
op_kwargs=
'host': get_host("test_conn"),
'schema': get_schema("test_conn"),
'login': get_login("test_conn"),
)
# Example how to use as Jinja string
bash_op = BashOperator(
task_id='bash_task',
bash_command='echo get_host("test_conn") get_schema("test_conn") get_login("test_conn") ',
)
PythonOperator
的渲染示例:
为BashOperator
渲染示例:
一般说明:
这段代码所做的是创建一个自定义函数func()
用作user_defined_macros
,从而提供使用它的能力,就像这个宏是由Airflow 本身定义的一样。
您可以通过以下方式访问模板: func()
,如示例中函数允许接受参数所示。
注意您可以为连接对象中的所有字段创建此类函数。
谨慎使用它,将密码作为文本传递可能不是一个好主意。
【讨论】:
【参考方案2】:我的 PR 添加了 conn.my_conn_id.login
语法,它将在气流 2.2.0 中可用(尚未发布截至 2021 年 9 月 22 日)。
在此处查看templates reference 的未发布文档
对于 2.1.4 及更早版本:
改进以前的答案,
为每个 DAG 定义宏:conn.<conn_id>
您可以使用以下宏获得conn.<connection_name>.host
语法:
class ConnectionGrabber:
def __getattr__(self, name):
return Connection.get_connection_from_secrets(name)
dag = DAG(user_defined_macros='connection': ConnectionGrabber()
将名称 connection
注入到 jinja 模板上下文中,connection
是一个 ConnectionGrabber
实例。此ConnectionGrabber
提供动态/托管属性,因此当您请求属性my_conn_id
(如connection.my_conn_id
)时,它将执行使用airflow.models.Connection.get_connection_from_secrets
的查找并返回,从那里您可以使用a.m.Connection
属性,如host
、login
、password
等
在 jinja 模板中使用bash_command='echo connection.mssql.host '
访问连接mssql
的完整示例:
from airflow.models import DAG,Connection
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
class ConnectionGrabber:
def __getattr__(self, name):
return Connection.get_connection_from_secrets(name)
args = 'owner': 'airflow', 'retries': 3, 'start_date': days_ago(2)
dag = DAG(
dag_id='test_connection',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
user_defined_macros='connection': ConnectionGrabber()
)
task = BashOperator(task_id='read_connection', bash_command='echo connection.mssql.host ', dag=dag)
在插件中定义宏:macros.conn.value.<conn_id>
如果您想在所有 DAG 中使用此宏,您可以将其包装在插件中,如下所示:
# $AIRFLOW_HOME/plugins/connection_macro.py
from airflow.plugins_manager import AirflowPlugin
from airflow.models import Connection
class ConnectionGrabber:
__name__ = "value"
def __str__(self):
return self.__name__
def __getattr__(self, name):
return Connection.get_connection_from_secrets(name)
class MacrosPlugin(AirflowPlugin):
name = "conn"
macros = [ConnectionGrabber()]
检查插件是否可以加载airflow plugins
airflow plugins
name | source | macros
=====+===========================+=======
conn | $PLUGINS_FOLDER/macros.py | value
然后你可以像这样在你的神社模板中使用 macros.conn.value.<conn_id>.host
task = BashOperator(task_id='read_connection', bash_command='echo macros.conn.value.mssql.host = macros.conn.value.mssql.host ', dag=dag)
我还打开了一个issue 以原生添加conn.value.<conn_id>
语法
【讨论】:
以上是关于使用 jinja 模板中的 Airflow 连接的主要内容,如果未能解决你的问题,请参考以下文章
在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用