Airflow:如何从不同的服务器进行 SSH 和运行 BashOperator

Posted

技术标签:

【中文标题】Airflow:如何从不同的服务器进行 SSH 和运行 BashOperator【英文标题】:Airflow: How to SSH and run BashOperator from a different server 【发布时间】:2017-01-20 07:42:07 【问题描述】:

有没有办法通过 ssh 连接到不同的服务器并使用 Airbnb 的 Airflow 运行 BashOperator? 我正在尝试使用 Airflow 运行 hive sql 命令,但我需要 SSH 到另一个盒子才能运行 hive shell。 我的任务应该是这样的:

    SSH 到 server1 启动 Hive 外壳 运行 Hive 命令

谢谢!

【问题讨论】:

【参考方案1】:

不适用于气流 2.x。

我想我只是想通了:

    在 UI 中的 Admin > Connection 下创建 SSH 连接。注意:如果你重置数据库,连接将被删除

    在 Python 文件中添加以下内容

     from airflow.contrib.hooks import SSHHook
     sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
    

    添加 SSH 操作员任务

     t1 = SSHExecuteOperator(
         task_id="task1",
         bash_command=<YOUR COMMAND>,
         ssh_hook=sshHook,
         dag=dag)
    

谢谢!

【讨论】:

请注意,您还必须导入操作符: from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator 使用最新的气流版本 1.10 SSHExecuteOperator 已弃用,必须使用新的 SSHOperator。如果有人使用 1.10,那么新的导入应该是 from airflow.contrib.hooks.ssh_hook import SSHHookfrom airflow.contrib.operators.ssh_operator import SSHOperator 在气流变量中创建 SSH 连接需要哪些参数? @Biranjan SSHOperator 已损坏:“NoneType”对象没有“startswith”属性。我在哪里可以找到 SSHExecuteOperator?现在还能用吗? @nicolamarangoni 你得到这个是因为你在参数中使用了 bash_command。使用“command”而不是“bash_command”,你不会得到错误。使用 bash_command 将命令属性设置为 None 并导致错误。【参考方案2】:

Anton 的回答需要注意的一点是,SSHOperator 对象的参数实际上是 ssh_conn_id,而不是 conn_id。至少在 1.10 版本中。

一个简单的例子看起来像

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = 
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime.now() - timedelta(minutes=20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),

dag = DAG(dag_id='testing_stuff',
          default_args=default_args,
          schedule_interval='0,10,20,30,40,50 * * * *',
          dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='test_ssh_operator',
    command=t1_bash,
    dag=dag)

【讨论】:

这应该是apache气流1.10的答案 做得很好。哈哈@# Step 1 - Dump data from postgres databases 顺便说一句,我在哪里可以找到 ssh_conn_id 或者只是随机命名? ssh_conn_id 是您在 Airflow UI 中创建连接或在 Admin -> Connections 下创建现有连接时键入的名称 在气流变量中创建 SSH 连接需要哪些参数?【参考方案3】:

这是 Airflow 2 中 ssh 运算符的工作示例:

[注意:此运算符的输出是 base64 编码的]

from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
sshHook = SSHHook(ssh_conn_id="conn-id", key_file='/opt/airflow/keys/ssh.key')
# a hook can also be defined directly in the code:
# sshHook = SSHHook(remote_host='server.com', username='admin', key_file='/opt/airflow/keys/ssh.key')

ls = SSHOperator(
        task_id="ls",
        command= "ls -l",
        ssh_hook = sshHook,
        dag = dag)

conn-id 是在 Admin -> Connections 中设置的。 key_file 是 ssh 私钥。

【讨论】:

嗨@artBcode 'key_file` 是我气流机的公钥吗?

以上是关于Airflow:如何从不同的服务器进行 SSH 和运行 BashOperator的主要内容,如果未能解决你的问题,请参考以下文章

我在空闲时收到很多交易(Airflow 和 Azure 文件共享)

具有 SSH 连接的 Airflow DAG 无法按计划同时开始运行

Airflow SSH 操作员错误:遇到 RSA 密钥,应为 OPENSSH 密钥

ssh-keygen 与不同的用户进行颠覆

Airflow:如何从 Postgres Operator 推送 xcom 价值?

如何从 UI 停止/终止 Airflow 任务