以编程方式在 dockerized apache 气流 python 操作符内创建 SSH 隧道
Posted
技术标签:
【中文标题】以编程方式在 dockerized apache 气流 python 操作符内创建 SSH 隧道【英文标题】:Programatically create SSH tunnel inside of dockerized apache airflow python operator 【发布时间】:2019-11-04 12:41:36 【问题描述】:我的程序在运行 apache 气流的 docker 容器内时无法创建 SSH 隧道。只有在我的本地机器上运行该功能才能正常工作。我有一个服务器列表,用于创建隧道、查询数据库和关闭连接。通常,我会这样做:
for server in servers:
server_conn = sshtunnel.SSHTunnelForwarder(
server,
ssh_username=ssh_user,
ssh_password=ssh_password,
remote_bind_address=(localhost, db_port),
local_bind_address=(localhost, localport)
)
这按预期工作,我可以从那里做任何我需要的事情。但是,在 Docker 中,它不起作用。我意识到 docker 运行并绑定到一个端口,实际上并不独立于主机系统,所以我使用network_mode="host"
来帮助缓解这个问题。但是,这不起作用,因为我的容器失去了相互通信的能力。这是我的 docker-compose 文件
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
- PGDATA=/var/lib/postgresql/data/pgdata
volumes:
- ~/.whale/pgdata:/var/lib/postgresql/data/pgdata
- ./dags/docker/sql/create.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
webserver:
image: hawk
build:
context: .
dockerfile: ./dags/docker/Dockerfile-airflow
restart: always
depends_on:
- postgres
# - redis
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Local
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
- "52023:22"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
我还按照here 的说明进行操作,达到了可以将docker exec
放入我的容器并手动键入上述 python sn-p 并获得工作连接的地步。
此外,我已经阅读了气流文档here,其中涵盖了 SSH 连接运算符,但那些仅支持 bash 命令,我需要运行我的 python 函数。我真的很困惑为什么 python 代码在exec
-ed 进入系统时会起作用,但当我通过气流 DAG 运行它时却不会。目前,我无法手动放入所有连接,因为一旦部署此系统,将有 > 100 个连接。任何帮助将不胜感激。如果需要更多深度,请告诉我。
【问题讨论】:
发现问题在于 Apache Airflow 目前不支持出站 SSH 隧道连接。最好的解决方法(我使用的)是将 API 端点与我需要的数据一起放置在有问题的服务器上。他们会阻止这个功能真是太奇怪了,我仍然不排除它可能是我自己的配置。 您好,您好运吗?我有同样的问题。我想以编程方式打开一个隧道并使用 Postgresql 到该隧道的连接 这里的示例代码gist.github.com/edthix/8bcb0eb8415d01e4302640cddf57f2b6 【参考方案1】:我在打开隧道并尝试在单独的任务中连接到数据库时遇到了同样的问题,但是通过在同一个任务中执行这两个任务使其正常工作(Airflow 不会在任务运行之间保持状态):
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='bastion-ssh-conn', keepalive_interval=60)
tunnel = ssh_hook.get_tunnel(5432, remote_host='<db_host>', local_port=5432)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='remote-db-conn', # NOTE: host='localhost'
schema='db_schema'
)
pg_cursor = pg_hook.get_conn().cursor()
pg_cursor.execute('SELECT * FROM table;')
select_val = pg_cursor.fetchall()
return select_val
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
dag=dag
)
这会将端口 5432 上的流量从本地机器转发到远程数据库主机上的同一端口。 SSHHook 需要与您将通过隧道通过的端点建立有效的 ssh 连接,而 PostgresHook 需要与端口 5432 上的“localhost”建立 postgres 连接。
【讨论】:
以上是关于以编程方式在 dockerized apache 气流 python 操作符内创建 SSH 隧道的主要内容,如果未能解决你的问题,请参考以下文章
如何以编程方式(在 Docker 上)获取运行容器的容器 ID?
以编程方式从Dockerfile中删除与docker build相关联的docker镜像
如何使用Python以编程方式在Apache Kafka中创建主题