使用 Apache 气流存储和访问密码

Posted

技术标签:

【中文标题】使用 Apache 气流存储和访问密码【英文标题】:Store and access password using Apache airflow 【发布时间】:2017-12-30 01:02:03 【问题描述】:

我们使用 Airflow 作为调度程序。我想在 DAG 中调用一个简单的 bash 运算符。 bash 脚本需要密码作为参数来进行进一步处理。

如何在 Airflow (config/variables/connection) 中安全地存储密码并在 dag 定义文件中访问它?

我是 Airflow 和 Python 的新手,因此我将不胜感激使用 sn-p 代码。

【问题讨论】:

【参考方案1】:

您可以将密码存储在 Hook 中 - 只要您设置了 fernet 密钥,该密码就会被加密。

以下是通过 UI 创建连接的方法:

然后:

要访问此密码:

from airflow.hooks.base_hook import BaseHook  # Deprecated in Airflow 2

connection = BaseHook.get_connection("username_connection")
password = connection.password # This is a getter that returns the unencrypted password.

Airflow 2 发布后的更新

airflow.hooks.base_hookhas been deprecated,您必须改用airflow.hooks.base

【讨论】:

这里是创建 postgres 连接字符串的方法。 connection_string = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + '@' \ + str(connection.host) + ':' + str(connection.port).encode('utf-8') + '/' + str(connection.schema) 这可以在Operator 内完成吗(以便下游任务 可以使用conn_id 引用连接)?请参阅this 以进一步详细说明我的查询 我不明白如何仅仅实例化 Connection 对象会在Airflowmeta-db 中创建一个条目?难道我们不必像this 一样执行session.add(..) 来持久化它吗? 这根本不保持连接,有没有办法在 UI 之外做到这一点?只是像@y2k-shubham 所说的那样初始化一个对象,在范围之外什么都不做。 @y2k-shubham 我已经找到了一种正确的做事方式,并在下面发布了我的答案。【参考方案2】:
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('bigquery_connection')
print(conn.get_extra())

这些conn.get_extra() 将为您提供存储在连接中的设置的 JSON。

【讨论】:

【参考方案3】:

您可以将密码存储在气流变量中,https://airflow.incubator.apache.org/ui.html#variable-view

    在 UI 中创建一个带有 key&value 的变量,例如 mypass:XXX 导入变量from airflow.models import Variable MyPass = Variable.get("mypass") 将 MyPass 传递给您的 bash 脚本:
command = """
          echo " params.my_param "
          """



task = BashOperator(
        task_id='templated',
        bash_command=command,
        params='my_param': MyPass,
        dag=dag)

【讨论】:

@MatthijsBrouns 的评论不再正确(适用于 Airflow 1.9+)。变量存储在加密的数据库中。但是,请注意,它们的值在 Airflow 网络用户界面中以纯文本形式显示,如果您有机会在不应该看到该值的人面前四处浏览。【参考方案4】:

使用管理/连接选项卡中的 GUI。

真正有效的答案是,以编程方式在 Airflow 中保持连接,如下面的 sn-p 所示。

在下面的示例中,myservice 代表一些外部凭据缓存。

使用以下方法时,您可以将您在外部管理的连接存储在气流内部。无需从每个 dag/任务中轮询服务。相反,您可以依赖气流的连接机制,而您也不必失去 Airflow 暴露的 Operator(如果您的组织允许这样做)。

诀窍是使用airflow.utils.db.merge_conn 来处理您创建的连接对象的设置。

    from airflow.utils.db import provide_session, merge_conn




    creds = "user": myservice.get_user(), "pwd": myservice.get_pwd() 

    c = Connection(conn_id=f'your_airflow_connection_id_here',
                   login=creds["user"],
                   host=None)
    c.set_password(creds["pwd"])
    merge_conn(c)

merge_conn 是内置的,气流本身使用它来初始化空连接。但是它不会自动更新。为此,您必须使用自己的辅助函数。

from airflow.utils.db import provide_session

@provide_session
def store_conn(conn, session=None):
    from airflow.models import Connection
    if session.query(Connection).filter(Connection.conn_id == conn.conn_id).first():
        logging.info("Connection object already exists, attempting to remove it...")
        session.delete(session.query(Connection).filter(Connection.conn_id == conn.conn_id).first())

    session.add(conn)
    session.commit()

【讨论】:

【参考方案5】:

在这种情况下,我将使用 PythonOperator,您可以从中获取 Hook 在您的数据库连接上使用 hook = PostgresHook(postgres_conn_id=postgres_conn_id)。然后,您可以在此挂钩上调用get_connection,这将为您提供一个 Connection 对象,您可以从中获取数据库连接的主机、登录名和密码。

最后,例如使用subprocess.call(your_script.sh, connection_string) 将连接详细信息作为参数传递。

这种方法有点复杂,但它确实允许您在 Airflow 中保持数据库连接的加密。此外,您应该能够将此策略拉入一个单独的 Operator 类,继承 PythonOperator 的基本行为,但添加获取钩子和调用 bash 脚本的逻辑。

【讨论】:

【参考方案6】:

这是我用过的。

    def add_slack_token(ds, **kwargs):
        """"Add a slack token"""
        session = settings.Session()

        new_conn = Connection(conn_id='slack_token')
        new_conn.set_password(SLACK_LEGACY_TOKEN)

        if not (session.query(Connection).filter(Connection.conn_id == 
         new_conn.conn_id).first()):
        session.add(new_conn)
        session.commit()
        else:
            msg = '\n\tA connection with `conn_id`=conn_id already exists\n'
            msg = msg.format(conn_id=new_conn.conn_id)
            print(msg)

    dag = DAG(
        'add_connections',
        default_args=default_args,
        schedule_interval="@once")


    t2 = PythonOperator(
        dag=dag,
        task_id='add_slack_token',
        python_callable=add_slack_token,
        provide_context=True,
    )

【讨论】:

【参考方案7】:

我编写了以下实用方法来创建一个会话到保存在 Airflow 中的外部数据库配置:

from airflow.hooks.base_hook import BaseHook
from sqlalchemy.orm.session import sessionmaker


def get_session(conn_id):
    dbhook = BaseHook.get_hook(conn_id=conn_id)
    engine = create_engine(dbhook.get_uri())
    Session = sessionmaker()
    session = Session(bind=engine)
    return session

【讨论】:

以上是关于使用 Apache 气流存储和访问密码的主要内容,如果未能解决你的问题,请参考以下文章

如何使用apache气流调度谷歌云bigquery存储过程

气流:每日刷新后如何在 s3 存储桶中公开对象

安装 Apache Airflow 后出错

apache气流中BeamRunPythonPipelineOperator和DataFlowPythonOperator的区别

远程气流Dags

使用 docker-compose 在 Windows 上构建气流时出错