使用 Apache 气流存储和访问密码
Posted
技术标签:
【中文标题】使用 Apache 气流存储和访问密码【英文标题】:Store and access password using Apache airflow 【发布时间】:2017-12-30 01:02:03 【问题描述】:我们使用 Airflow 作为调度程序。我想在 DAG 中调用一个简单的 bash 运算符。 bash 脚本需要密码作为参数来进行进一步处理。
如何在 Airflow (config/variables/connection
) 中安全地存储密码并在 dag 定义文件中访问它?
我是 Airflow 和 Python 的新手,因此我将不胜感激使用 sn-p 代码。
【问题讨论】:
【参考方案1】:您可以将密码存储在 Hook 中 - 只要您设置了 fernet 密钥,该密码就会被加密。
以下是通过 UI 创建连接的方法:
然后:
要访问此密码:
from airflow.hooks.base_hook import BaseHook # Deprecated in Airflow 2
connection = BaseHook.get_connection("username_connection")
password = connection.password # This is a getter that returns the unencrypted password.
Airflow 2 发布后的更新
库airflow.hooks.base_hook
has been deprecated,您必须改用airflow.hooks.base
。
【讨论】:
这里是创建 postgres 连接字符串的方法。connection_string = 'postgresql://' + str(connection.login) + ':' + str(connection.password) + '@' \ + str(connection.host) + ':' + str(connection.port).encode('utf-8') + '/' + str(connection.schema)
这可以在Operator
内完成吗(以便下游任务 可以使用conn_id
引用连接)?请参阅this 以进一步详细说明我的查询
我不明白如何仅仅实例化 Connection
对象会在Airflow
的meta-db 中创建一个条目?难道我们不必像this 一样执行session.add(..)
来持久化它吗?
这根本不保持连接,有没有办法在 UI 之外做到这一点?只是像@y2k-shubham 所说的那样初始化一个对象,在范围之外什么都不做。
@y2k-shubham 我已经找到了一种正确的做事方式,并在下面发布了我的答案。【参考方案2】:
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('bigquery_connection')
print(conn.get_extra())
这些conn.get_extra()
将为您提供存储在连接中的设置的 JSON。
【讨论】:
【参考方案3】:您可以将密码存储在气流变量中,https://airflow.incubator.apache.org/ui.html#variable-view
-
在 UI 中创建一个带有 key&value 的变量,例如 mypass:XXX
导入变量
from airflow.models import Variable
MyPass = Variable.get("mypass")
将 MyPass 传递给您的 bash 脚本:
command = """
echo " params.my_param "
"""
task = BashOperator(
task_id='templated',
bash_command=command,
params='my_param': MyPass,
dag=dag)
【讨论】:
@MatthijsBrouns 的评论不再正确(适用于 Airflow 1.9+)。变量存储在加密的数据库中。但是,请注意,它们的值在 Airflow 网络用户界面中以纯文本形式显示,如果您有机会在不应该看到该值的人面前四处浏览。【参考方案4】:使用管理/连接选项卡中的 GUI。
真正有效的答案是,以编程方式在 Airflow 中保持连接,如下面的 sn-p 所示。
在下面的示例中,myservice
代表一些外部凭据缓存。
使用以下方法时,您可以将您在外部管理的连接存储在气流内部。无需从每个 dag/任务中轮询服务。相反,您可以依赖气流的连接机制,而您也不必失去 Airflow 暴露的 Operator(如果您的组织允许这样做)。
诀窍是使用airflow.utils.db.merge_conn
来处理您创建的连接对象的设置。
from airflow.utils.db import provide_session, merge_conn
creds = "user": myservice.get_user(), "pwd": myservice.get_pwd()
c = Connection(conn_id=f'your_airflow_connection_id_here',
login=creds["user"],
host=None)
c.set_password(creds["pwd"])
merge_conn(c)
merge_conn 是内置的,气流本身使用它来初始化空连接。但是它不会自动更新。为此,您必须使用自己的辅助函数。
from airflow.utils.db import provide_session
@provide_session
def store_conn(conn, session=None):
from airflow.models import Connection
if session.query(Connection).filter(Connection.conn_id == conn.conn_id).first():
logging.info("Connection object already exists, attempting to remove it...")
session.delete(session.query(Connection).filter(Connection.conn_id == conn.conn_id).first())
session.add(conn)
session.commit()
【讨论】:
【参考方案5】:在这种情况下,我将使用 PythonOperator,您可以从中获取 Hook
在您的数据库连接上使用
hook = PostgresHook(postgres_conn_id=postgres_conn_id)
。然后,您可以在此挂钩上调用get_connection
,这将为您提供一个 Connection 对象,您可以从中获取数据库连接的主机、登录名和密码。
最后,例如使用subprocess.call(your_script.sh, connection_string)
将连接详细信息作为参数传递。
这种方法有点复杂,但它确实允许您在 Airflow 中保持数据库连接的加密。此外,您应该能够将此策略拉入一个单独的 Operator 类,继承 PythonOperator 的基本行为,但添加获取钩子和调用 bash 脚本的逻辑。
【讨论】:
【参考方案6】:这是我用过的。
def add_slack_token(ds, **kwargs):
""""Add a slack token"""
session = settings.Session()
new_conn = Connection(conn_id='slack_token')
new_conn.set_password(SLACK_LEGACY_TOKEN)
if not (session.query(Connection).filter(Connection.conn_id ==
new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
else:
msg = '\n\tA connection with `conn_id`=conn_id already exists\n'
msg = msg.format(conn_id=new_conn.conn_id)
print(msg)
dag = DAG(
'add_connections',
default_args=default_args,
schedule_interval="@once")
t2 = PythonOperator(
dag=dag,
task_id='add_slack_token',
python_callable=add_slack_token,
provide_context=True,
)
【讨论】:
【参考方案7】:我编写了以下实用方法来创建一个会话到保存在 Airflow 中的外部数据库配置:
from airflow.hooks.base_hook import BaseHook
from sqlalchemy.orm.session import sessionmaker
def get_session(conn_id):
dbhook = BaseHook.get_hook(conn_id=conn_id)
engine = create_engine(dbhook.get_uri())
Session = sessionmaker()
session = Session(bind=engine)
return session
【讨论】:
以上是关于使用 Apache 气流存储和访问密码的主要内容,如果未能解决你的问题,请参考以下文章
apache气流中BeamRunPythonPipelineOperator和DataFlowPythonOperator的区别