python paramiko 多线程批量执行指令及批量上传文件和目录

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python paramiko 多线程批量执行指令及批量上传文件和目录相关的知识,希望对你有一定的参考价值。

源代码:

https://github.com/jy1779/be.git

环境需求:

1、python3

2、paramiko

pip install --upgrade pip

apt-get install libssl-dev

pip3 install paramiko

3、执行权限

chmod +x becmd.py

ln -s /root/be/bin/becmd.py /usr/local/sbin/becmd

chmod +x besync.py

ln -s /root/be/bin/becmd.py /usr/local/sbin/besync

4、导入路径设置

cd /usr/lib/python3.5/dist-packages/

touch be.pth

vim be.pth

/root/be     #be.pth文件内容

5、因为是从windows开发所以会出现以下问题:

windows 上传的文件,可以用这个指令格式化成Unix文件。

apt install dos2unix

[email protected]:~/be/bin# ./besync.py

/usr/bin/env: ‘python3\r’: No such file or directory

[email protected]:~/be/bin# dos2unix besync.py

dos2unix: converting file besync.py to Unix format ...

[email protected]:~/be/bin# ./besync.py

Reminder: The source and destination addresses do not exist

Usage: ./besync.py <source address> <destination address>

解决

apt install dos2unix

dos2unix becmd.py

dos2unix besync.py  

6、日志路径的问题。最好是绝对路径,不然只能在be目录下执行becmd 和besync

f = open("/root/be/logs/besync.log",‘a‘)

f = open("/root/be/logs/becmd.log",‘a‘)

程序目录:

.
├── app
│   ├── __init__.py
│   ├── pwd_connect_cmd.py
│   ├── pwd_connect_sync.py
│   ├── __pycache__
│   │   ├── __init__.cpython-35.pyc
│   │   ├── pwd_connect_cmd.cpython-35.pyc
│   │   ├── pwd_connect_sync.cpython-35.pyc
│   │   ├── ssh_be_cmd.cpython-35.pyc
│   │   └── ssh_be_sync.cpython-35.pyc
│   ├── ssh_be_cmd.py 
│   └── ssh_be_sync.py
├── bin
│   ├── becmd.py
│   ├── besync.py
│   └── __init__.py
├── conf
│   ├── config.py
│   ├── __init__.py
│   └── __pycache__
│       ├── config.cpython-35.pyc
│       └── __init__.cpython-35.pyc
├── __init__.py
└── logs
    ├── becmd.log
    ├── besync.log
    └── __init__.py


app/ssh_be_cmd.py

import paramiko
import threading
import datetime
class MyThread(threading.Thread):
    def __init__(self,ip,port,username,password,cmd):
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        super(MyThread,self).__init__()
        self.ip = ip
        self.port = port
        self.username = username
        self.password = password
        self.cmd = cmd
    def run(self):
        port = int(self.port)
        self.ssh.connect(hostname=self.ip, port=port, username=self.username, password=self.password)
        stdin, stdout, stderr = self.ssh.exec_command(self.cmd)
        res, err = stdout.read(), stderr.read()
        result = res if res else err
        f = open("/root/be/logs/becmd.log",‘a‘)
        f.write(str(datetime.datetime.now())+" ")
        f.write(self.ip+" ")
        f.write(self.username+ " ")
        f.write(self.cmd+ "\n")
        f.close()
        print("\033[1;32;40m" + self.ip.rjust(33,‘=‘)+ "\033[0m","\033[1;32;40m" + "Command result: ".ljust(37,‘=‘)+ "\033[0m")
        print(result.decode())
        self.ssh.close()

app/pwd_connect_cmd.py

import sys
from app.ssh_be_cmd import MyThread
from conf.config import account
def pwd_con(host):
    ip = account[host]["ip"]
    port = account[host]["port"]
    username = account[host]["username"]
    password = account[host]["password"]
    a=sys.argv[1:100]
    cmd = " ".join(a)
    if len(a) >=1:
        M = MyThread(ip,port,username,password,cmd)
        M.start()
    else:
        print("Reminder: The command does not exist")
        exit()
def connect():
    for host in account.keys():
        pwd_con(host)

app/ssh_be_sync.py

import paramiko
import threading
import datetime,time
import os
from os.path import getsize
class MyThread(threading.Thread):
    def __init__(self,ip,port,username,password,cmd):
        super(MyThread,self).__init__()
        self.ip = ip
        self.port = port
        self.username = username
        self.password = password
        self.cmd = cmd
    def run(self):
        port = int(self.port)
        self.transport = paramiko.Transport((self.ip, port))
        self.transport.connect(username=self.username, password=self.password)
        self.sftp = paramiko.SFTPClient.from_transport(self.transport)
        help="""
                -f send file to remote host.
                   %s -f <source address> <destination address>
                -d send dir to remote host.
                   %s -d <source address> <destination address>
                --help show help.
                   %s --help
            """%(self.cmd[0],self.cmd[0],self.cmd[0])
        def create_remote_dir(dir):
            for item in dir:
                try:
                    self.sftp.stat(item)
                    pass
                except FileNotFoundError:
                    print("Create a new directory: ", item)
                    self.sftp.mkdir(item)
        def besync_log():
            f = open("/root/be/logs/besync.log",‘a‘)
            for i in str(datetime.datetime.now())+" ",self.ip+" ",self.username+ " ",self.cmd[0]+" ",self.cmd[1]+" ",src+" ",des+ "\n":
                f.write(i)
            f.close()
        if len(self.cmd) == 4 and self.cmd[1] == "-f":
            src = self.cmd[2]
            des = self.cmd[3]
            besync_log()
            time_start = time.time()
            if os.path.isfile(src):
                des_list = des.split("/")
                des_dir = des_list[1:-1]
                b=""
                c=[]
                for item in des_dir:
                    b+="/"+item
                    c.append(b)
                create_remote_dir(c)
                self.sftp.put(src, des)
                total_time = time.time() - time_start
                print("\033[1;32;40mSend Successful.\033[0m")
                print("total size: " + str(getsize(src)) + " bytes")
                print("total time: " + str(total_time))
                self.transport.close()
        elif len(self.cmd) == 4 and self.cmd[1] == "-d":
            def for_dir():
                for res in path:
                    if os.path.isdir(res):
                        local_dir_path.append(res)
                remote_dir_path.append(des)
            def for_zdir():
                des_src_dir.append(remote_dir_path[1])
                des_src_dir_list = des_src_dir[0].split("/")
                des_dir_list = des_src_dir_list[1:]
                c = ""
                remote_des_src_path = []
                for item in des_dir_list:
                    c += "/" + item
                    remote_des_src_path.append(c)
                create_remote_dir(remote_des_src_path)
                create_remote_dir(remote_dir_path)
                for res in path:
                    if os.path.isfile(res):
                        local_file_path.append(res)
            src = self.cmd[2]
            des = self.cmd[3]
            besync_log()
            sep = "/"
            path = []
            local_dir_path = []
            local_file_path = []
            remote_dir_path = []
            remote_file_path = []
            des_src_dir = []
            for i in os.listdir(src):
                path.append(src + sep + i)
            for n in path:
                if os.path.isdir(n) and os.listdir(n):
                    for i in os.listdir(n):
                        path.append(n + sep + i)
            local_dir_path.append(src)
            local_dir = src.split("/")
            local_dir_first = local_dir[0:-1]
            global a
            if len(local_dir_first) == 0:
                for_dir()
                for res in local_dir_path:
                    remote_dir_path.append(des + "/" + res)
                for_zdir()
                for res in local_file_path:
                        remote_file_path.append(des + "/" + res)
            else:
                if len(local_dir_first) ==1:
                    dir_join="/".join(local_dir_first)
                    a=dir_join
                else:
                    dir_join="/".join(local_dir_first)
                    a=dir_join+"/"
                for res in path:
                    if os.path.isdir(res):
                        local_dir_path.append(res)
                remote_dir_path.append(des)
                b=[item.split(a)[-1] for item in local_dir_path]
                for res in b:
                    if len(local_dir_first) ==1:
                        remote_dir_path.append(des + res)
                    else:
                        remote_dir_path.append(des + "/" + res)
                for_zdir()
                d = [item.split(a)[-1] for item in local_file_path]
                for res in d:
                    if len(local_dir_first) ==1:
                        remote_file_path.append(des + res)
                    else:
                        remote_file_path.append(des + "/" + res)
            time_start = time.time()
            local_file_num = len(local_file_path)
            for i in range(local_file_num):
                self.sftp.put(local_file_path[i],remote_file_path[i])
            total_time = time.time() - time_start
            print("\033[1;32;40mSend Successful.\033[0m")
            print("total time: " + str(total_time))
            self.transport.close()
        else:
            print(help)

app/pwd_connect_sync.py

import sys
from app.ssh_be_sync import MyThread
from conf.config import  account
def pwd_con(host):
    ip = account[host]["ip"]
    port = account[host]["port"]
    username = account[host]["username"]
    password = account[host]["password"]
    cmd=sys.argv[0:100]
    M = MyThread(ip, port, username, password, cmd)
    M.start()
def connect():
    for host in account.keys():
        pwd_con(host)

conf/config.py

account = {
    "192.168.1.57":{
        "ip":"192.168.1.57",
        "port":"22",
        "username":"root",
        "password":"123456"
    },
     "192.168.1.75":{
         "ip": "192.168.1.75",
         "port": "22",
         "username": "root",
         "password": "123456"
     }
}

bin/becmd.py

#!/usr/bin/env python3
from app.pwd_connect_cmd import connect
connect()

bin/besync.py

#!/usr/bin/env python3
from app.pwd_connect_sync import connect
connect()

使用例子:

1、批量执行指令:

技术分享

2、批量上传文件

技术分享

3、批量上传目录

技术分享

本文出自 “微风清凉” 博客,谢绝转载!

以上是关于python paramiko 多线程批量执行指令及批量上传文件和目录的主要内容,如果未能解决你的问题,请参考以下文章

python 通过paramiko模块批量执行ssh命令

网卡速率变化导致paramiko模块timeout的失效,多线程超时控制解决办法。

用Python多进程和paramiko给主机组批量分发命令和传送文件

使用paramiko批量更改root密码

使用Python之paramiko模块和threading实现多线程登录多台Linux服务器

是否可以使用 SIMD 指令批量处理相同的功能?