python 远程操作linux 增删目录以及文件
Posted sunshinekimi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 远程操作linux 增删目录以及文件相关的知识,希望对你有一定的参考价值。
import paramiko import os import logging import json import unittest from stat import S_ISDIR,S_ISREG logging.basicConfig(level = logging.ERROR,format = ‘%(asctime)s - %(levelname)s -->%(funcName)s at line %(lineno)d: %(message)s‘) log= logging.getLogger() """ version 0.1: fit initial client can not be shut down ,submit a func close client auto after called paramiko func,support remote upload and download func extend WebankSftp class """ # paramiko client def WebankClients(ip,port,user,pwd): return WebankSftp(ip,port,user,pwd) class WebankSftp(object): """ how to use it fast: >>ip = ‘192.168.110.137‘ >>port = 22 >>pwd = ‘admin‘ >>user = ‘root‘ >>client=WebankClients(ip,port,user,pwd) >>client.query_dir("/usr/local/") 2019-10-27 12:44:41,191 - INFO -->query_dir at line 58: [‘python36‘, ‘Python-3.6.6‘, ‘Python-3.6.6.tgz‘, ‘test6‘] >> client.query_dir("/usr/local/test677") 2019-10-27 12:56:22,275 - INFO -->_log at line 1819: Authentication (password) successful! Traceback (most recent call last): File "D:/workspace/ReportProject/wbsftp/minisftp.py", line 223, in <module> client.query_dir("/usr/local/test677") File "D:/workspace/ReportProject/wbsftp/minisftp.py", line 64, in query_dir raise FileExistsError(err.decode("utf-8")) FileExistsError: ls: cannot access /usr/local/test677: No such file or directory """ def __init__(self,ip,port,user,pwd): self.port = port self.pwd = pwd self.user = user self.ip = ip self.cli=SftpOpen(self.ip, self.port, self.user, self.pwd).sftp self.para_cli=self.initial() self.para_sftp=self.para_cli.open_sftp() def initial(self): client = paramiko.SSHClient() try: client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(self.ip, self.port, self.user, self.pwd) if client: return client except Exception as e: log.error(e) # paramiko function def query_dir(self,dir)->list: # connection=self.initial() connection = self.para_cli shell="ls -l %s | awk ‘{print $9}‘" % dir input, out, err = connection.exec_command(shell) res, err = out.readlines(), err.read() if res: files_list = [i.strip(‘ ‘) for i in res if i.strip(‘ ‘)] # log.info(files_list) return files_list if err: # log.info(err) raise FileExistsError(err.decode("utf-8")) # paramiko function def callback_stdout(self,shell:str): """ this is method for execute Shell COMMAND and CallBack Result""" #connection = self.initial() connection=self.para_cli try: input, out, err = connection.exec_command(shell,get_pty=True) res, err = out.readlines(), err.read() if res: i=‘‘ for j in res: i =i+j log.info("call_back res:%s" % i) return i if err: log.info("call_back err: %s" % err.decode("utf-8")) return err.decode("utf-8") except Exception as f: log.error(f) def touch(self,filepath): # client = self.initial() client =self.para_cli def create_file(filepath): client.exec_command("touch %s"%filepath) # create_file(filepath) # try: a=self.exists_file(filepath) if a: self.removes(filepath) create_file(filepath) info={"msg": "File {} found and have remove it ,success create {}".format(filepath,filepath),"cdde":200,"status":"success"} log.info(json.dumps(info,indent=4,ensure_ascii=False) ) else: create_file(filepath) # if self.exists_dir(os.path.dirname(filepath)): # create_file(filepath) # info={‘msg‘: ‘{} exists but {} not found,so directly create {} sucess‘.format(os.path.dirname(filepath),filepath,filepath) # ,‘code‘:200,‘status‘:‘success‘} # log.info(json.dumps(info,indent=4,ensure_ascii=False) ) # else: # # create pdir of filename # self.mkdir(os.path.dirname(filepath)) # create_file(filepath) # log.info("{‘msg‘: ‘mkdir %s success,touch %s success ,status:200‘}"%(os.path.dirname(filepath),filepath)) # except Exception as e: # log.error(e) #sftp func def mkdir(self,dir): """ if linux have exists filename the same with dir , dir will create failed because it have exists else it will create success """ #client=SftpOpen(self.ip,self.port,self.user,self.pwd).sftp client=self.cli # os.path.isdir() just for win not work for linux, will always return False try: bool=self.exists_dir(dir) if bool: client.rmdir(dir) client.mkdir(dir) else: client.mkdir(dir) except Exception as e: #log.error(e) log.error("{‘msg‘: ‘mkdir %s failed maybe it have exists same name file or dir ,‘code‘:100,‘status‘: ‘failed‘}"%dir) else: log.info("{‘msg‘: ‘mkdir %s success‘,‘code‘: 200,‘status‘:‘success‘}"% dir) #sftp func def removes(self,filepath): """ this is a method for delete file""" #client=SftpOpen(self.ip,self.port,self.user,self.pwd).sftp client=self.cli try: if self.exists_file(filepath): client.remove(filepath) else: pass except FileNotFoundError as e: info={‘msg‘: ‘File %s Not Found Error‘%filepath, ‘retcode‘:100,‘status‘: ‘failed‘} log.error(json.dumps(info,ensure_ascii=False,indent=4),exc_info=False) else: log.info("{‘msg‘:‘remove file %s success‘,‘recode‘:200,‘status‘: ‘success‘}"% filepath) # sftp func def list_dir(self,dir): #client = SftpOpen(self.ip, self.port, self.user, self.pwd).sftp client=self.cli try: res= client.listdir(dir) information=json.dumps({"files": res},indent=4,ensure_ascii=False) log.info("%s contains files:" %dir+information) return res except FileNotFoundError: log.error("{‘msg‘: ‘%s Not Found Error‘, ‘retcode‘:100,‘status‘: ‘failed‘}"%dir) # sftp function def rm(self,absdir): def isdirs(filepath, sftp): """this is a method for recursion and delete files and dir ,os.path.isdir() it doesn‘t work for linux """ return S_ISDIR(sftp.stat(filepath).st_mode) def subsftp(absdir,sftp): files = sftp.listdir(absdir) try: for file in files: filepath = os.path.join(absdir, file) fps = filepath.replace("\", "/") if isdirs(fps, sftp): # recur util del all files under file path is dir self.rm(fps) else: sftp.remove(fps) log.info("{‘msg‘: ‘remove file %s success,‘retcode‘: 200}" %(fps)) # rmdir empty dir sftp.rmdir(absdir) except Exception as e: log.error(e) else: log.info("{‘msg‘: ‘recursion delete %s whole dir,‘recode‘:200}" % (absdir)) #sftp = SftpOpen(self.ip, self.port, self.user, self.pwd).sftp sftp=self.cli try: subsftp(absdir, sftp) except Exception as e: log.error(e) # sftp func def exists_dir(self,dir): # client=SftpOpen(self.ip, self.port, self.user, self.pwd).sftp client=self.cli def isdirs(dir, sftp): return S_ISDIR(sftp.stat(dir).st_mode) try: isdirs(dir,client) log.info("exists %s " % dir) return True except FileNotFoundError as f: # info={"msg": "Not Found DIR %s"%dir,"retcode":100,"status": "failed"} # log.error(json.dumps(info,indent=4,ensure_ascii=False)) return False #sftp func def exists_file(self,filepath): client=self.para_sftp try: client.stat(filepath) except Exception as e: # info = {"msg": "Not Found FileName %s" % filepath, "retcode": 100, "status": "failed"} # log.error(info) return False else: log.info("exists %s file "%filepath) return True def sftp_close(self): self.cli.close() self.para_cli.close() self.para_sftp.close() class SftpOpen(object): def __init__(self,ip,port,user,pwd): self.tp= paramiko.Transport(ip, port) self.tp.connect(username=user, password=pwd) self.sftp = paramiko.SFTPClient.from_transport(self.tp) class TestSftp(unittest.TestCase): @classmethod def setUpClass(cls) -> None: ip = "192.168.110.151" port = 22 user = "root" pwd = "admin" cls.client = WebankClients(ip, port, user, pwd) log.info("start selfcheck method of sftp ") @classmethod def tearDownClass(cls) -> None: try: cls.client.sftp_close() except Exception as e: log.error(e) else: log.info("close all connections") log.info("finish all unittest selfcheck method of sftp ") def test_query_dir(self): """测试查询dir下dirs and files""" files=self.client.query_dir("/usr/local/listdir") self.assertIn(‘list.txt‘,files) def test_call_backstdout(self): shell="ls -l /usr/local" readlines=self.client.callback_stdout(shell) self.assertIn("redisdb",readlines) def test_exsitdir(self): a=self.client.exists_dir("/usr/local") assert a==True def test_exsistfile(self): b=self.client.exists_file("/usr/local/redisdb/logs/redis.log") assert b==True def test_touch(self): """create file """ path="/usr/local/toutest.txt" self.client.touch(path) a=self.client.exists_file(path) self.assertEqual(a,True) self.client.removes(path) def test_userremove(self): """remove file """ path="/usr/local/tou.txt" self.client.touch(path) self.client.removes(path) a=self.client.exists_file(path) self.assertEqual(a,False) def test_mkandrm(self): """bug 已经存在目录无法直接mkdir""" self.client.mkdir("/usr/local/test1") self.client.rm("/usr/local/test1") if __name__ == ‘__main__‘: unittest.main()
.......
----------------------------------------------------------------------
Ran 7 tests in 1.030s
OK
Process finished with exit code 0
以上是关于python 远程操作linux 增删目录以及文件的主要内容,如果未能解决你的问题,请参考以下文章
Linux下编写简单C程序连接MySQL数据库,实现简单的增删查改操作
把字符串的增删查改,插入以及删除各种操作封装为一个用c代码写的库,代码怎么写,我写不来求教
Python学习笔记_05:使用Flask+MySQL实现用户登陆注册以及增删查改操作