python 主机宝
Posted 水·域
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 主机宝相关的知识,希望对你有一定的参考价值。
需求:开发一个主机批量管理系统,要求按saltstack方式执行命令
1 #!/usr/bin/env python3.5 2 # -*- coding:utf8 -*- 3 import os,sys,pickle,logging 4 BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 5 sys.path.append(BASEDIR) 6 from conf import setting 7 from core import file_handler 8 from core import db_handler 9 from core import host_handler 10 """ 11 ************************************ 12 此为主机宝主运行程序 13 ************************************ 14 """ 15 logging.basicConfig(level=logging.INFO, filename=os.path.join(BASEDIR,\'log/ssh.log\'), filemode=\'a\', 16 format=\'%(asctime)s %(levelname)s %(message)s\', datefmt=\'%Y/%m/%d %H:%M:%S\') 17 log = logging.getLogger(__name__) 18 def login(): 19 count = 0 20 flage = False 21 while count < 3: 22 count += 1 23 user_input = input("请输入用户名:").strip() 24 pass_input = input("请输入密码:").strip() 25 db = db_handler.handler(setting.DATABASE,user_input) 26 if os.path.isfile(db): 27 f = open(db,"rb") 28 data = pickle.loads(f.read()) 29 f.close() 30 if user_input == data["name"] and data["lock"] !=1: 31 if pass_input == data["password"]: 32 flage = True 33 log.info("用户[%s]登陆成功!"%user_input) 34 break 35 else: 36 print("用户名或密码错误!") 37 if count > 2: 38 with open(db,"wb") as f: 39 data["lock"] = 1 40 pickle.dump(data,f) 41 log.info("用户[%s]被锁定!"%user_input) 42 print("用户[%s]已被锁定!"%user_input) 43 else: 44 print("用户[%s]已被锁定!"%user_input) 45 exit() 46 if flage == True: 47 print("用户[%s]登陆成功!"%user_input) 48 men() 49 else: 50 exit() 51 def men(): 52 print("欢迎进入主机宝管理系统!") 53 host_men = """ 54 1、显示主机与所属组 55 2、增加组 56 3、增加主机 57 4、修改主机 58 5、删除主机 59 6、执行命令 60 7、退出管理系统 61 """ 62 host_dic ={ 63 "1":{"option":"显示主机与所属组","action":file_handler.show}, 64 "2":{"option":"增加组","action":file_handler.add_group}, 65 "3":{"option":"增加主机","action":file_handler.add_host}, 66 "4":{"option":"修改主机","action":file_handler.mod_host}, 67 "5":{"option":"删除主机","action":file_handler.host_delete}, 68 "6":{"option":"执行命令","action":host_handler.exciton}, 69 "7":{"option":"退出管理系统","action":exit} 70 } 71 exit_flag =False 72 while not exit_flag: 73 print(host_men) 74 option = input("请按键选择:").strip() 75 if option in host_dic: 76 func = host_dic[option].get("action") 77 func() 78 79 80 def run(): 81 login()
1 #!/usr/bin/env python3.5 2 # -*- coding:utf8 -*- 3 import os,sys,re 4 BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 5 sys.path.append(BASEDIR) 6 import logging,importlib 7 # 初始化日志格式及对象 8 logging.basicConfig(level=logging.INFO, filename=os.path.join(BASEDIR,\'log/ssh.log\'), filemode=\'a\', 9 format=\'%(asctime)s %(levelname)s %(message)s\', datefmt=\'%Y/%m/%d %H:%M:%S\') 10 log = logging.getLogger(__name__) 11 # 调用执行模块 12 def module_excute(moudle_name,func_name,arg): 13 try: 14 # 导入要执行的模块 15 module = importlib.import_module("salt.salt_{}".format(moudle_name)) 16 # 判断函数名是否包含在模块里 17 if hasattr(module,func_name): 18 func = getattr(module,func_name) 19 func(arg) 20 print("***" * 20) 21 else: 22 print("不存在") 23 except Exception as e : 24 log.info("input:{},error:{}".format(moudle_name,e)) 25 def exciton(): 26 usage = """ 27 salt "*" cmd.run "excute_cmd1,excute_cmd2..." :"所有主机执行命令" 28 salt -g "group" cmd.run "excute_cmd1,excute_cmd2..." :"指定组执行命令" 29 salt -h "ip_host" cmd.run "excute_cmd1,excute_cmd2..." :"指定主机IP执行命令" 30 salt "*" file.put "filename" :"所有主机上传文件" 31 salt "*" file.get "filename" :"所有主机下载文件" 32 exit :"退出" 33 """ 34 print("欢迎进入主机命令执行系统!") 35 user_cmd = input("请输入要执行的命令>>>:").strip() 36 if user_cmd.startswith("salt"): # 判断是否以salt开始 37 user_cmd_list = user_cmd.split() #以空格分割成列表 38 # 过滤掉特殊字符 39 user_arg_list = list(map(lambda x:re.sub(r\'[\\"\\\']\',"",x),user_cmd_list)) 40 # 匹配含点的模块名字 41 p = re.compile(r\'[a-zA-Z_]+\\.[a-zA-Z_]+\') 42 flag =False 43 count = 0 44 for i in user_arg_list: 45 if p.match(i): 46 flag = True 47 count +=1 48 moudle_func = i # 获取模块名 49 break # 只匹配第一个含点的模块名 50 # 只有命令里含*。*格式时,继续 51 if flag and count == 1: 52 cmd_list = user_arg_list[user_arg_list.index(moudle_func)+1:] # 获取原列表在此命令(*.*)之后的所有命令变成命令列表 53 obj_list = user_arg_list[user_arg_list.index("salt")+1:user_arg_list.index(moudle_func)] # 获取以salt开头模块函数结尾之前的所有内空转到列表 54 arg = (obj_list,cmd_list) # 将操作对象列表和指令列表放到元组中 55 moudle_name = moudle_func.split(".")[0] # 获取模块名 56 func_name = moudle_func.split(".")[1] # 获取函数名 57 module_excute(moudle_name,func_name,arg) 58 exciton() 59 else: 60 print("命令输入错误!请按以下格式输入:") 61 print(usage) 62 exciton() 63 elif user_cmd =="exit": 64 exit() 65 else: 66 print("命令输入错误!请按以下格式输入:") 67 print(usage) 68 exciton()
#!/usr/bin/env python3.5 # -*- coding:utf8 -*- import os,sys,pickle,re,logging BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASEDIR) from conf import setting from core import db_handler from core import host_handler db_path = db_handler.handler(setting.DATABASE,"host") if os.path.exists(db_path): with open(db_path, "rb") as f: data = pickle.loads(f.read()) else: data =[] # 初始化日志格式及对象 logging.basicConfig(level=logging.INFO, filename=os.path.join(BASEDIR,\'log/ssh.log\'), filemode=\'a\', format=\'%(asctime)s %(levelname)s %(message)s\', datefmt=\'%Y/%m/%d %H:%M:%S\') log = logging.getLogger(__name__) def check_ip(ip): for data_ip in data: for ip_data in data_ip: for iptest in data_ip[ip_data]: if ip in iptest["ip"]: return True else: return False def check_group(group): for gc in data: if group in gc.keys(): return True else: return False def add_group(): add_input = input("请输入要增加的组:").strip() list_data = [] for y_data in data: for k in y_data: list_data.append(k) if add_input not in list_data: new_group = {"%s"%add_input:[]} data.append(new_group) with open(db_path,"wb") as fw: pickle.dump(data,fw) log.info("增加组%s成功!"%add_input) print("增加组%s成功!"%add_input) else: log.error("增加组%s失败!已存在该组!"%add_input) print("增加组%s失败!已存在该组!"%add_input) def show(): for y_data in data: for k in y_data: for i in y_data[k]: print("主机IP:[%s],所属组为:[%s]"%(i["ip"],k)) def add_host(): """ 增加主机 :return: """ try: host_add = input("请输入主机IP:").strip() host_port = int(input("请输入端口号:")) host_user = input("请输入登陆主机用户名:").strip() host_pwd = input("请输入登陆主机密码:").strip() # 判断是否为IP if re.match(r"((2[0-4]\\d|25[0-5]|[01]?\\d\\d?)\\.){3}(2[0-4]\\d|25[0-5]|[01]?\\d\\d?)$",host_add): if host_port != "" and len(host_user) != 0 and len(host_pwd)!= 0: host_group = input("请输入主机所属组:").strip() for g in data: if host_group in g.keys(): for g_data in data: if host_group in g_data.keys(): g_data[host_group].append({"ip":"%s"%host_add,"port":"%s"%host_port,"username":"%s"%host_user,"password":"%s"%host_pwd}) with open(db_path,"wb") as fw: pickle.dump(data,fw) log.info("增加主机[%s]成功!"%host_add) print("增加主机[%s]成功!"%host_add) break else: log.error("增加主机[%s]失败,组[%]不存在!"%(host_add,host_group)) print("增加主机[%s]失败,组[%]不存在!!"%(host_add,host_group)) return add_host() else: return add_host() else: log.error("你输入的不是IP地址:%s" %host_add) print("你输入的不是IP地址:%s" %host_add) except Exception as ex: log.error("增加主机异常%s"%ex) print("增加主机异常") def mod_host(): """ 修改主机所属组 :return: """ IP_modi = input("请输入要变更的IP:").strip() ip_check = check_ip(IP_modi) if ip_check: gg = input("请输入转入的组名称:").strip() gg_check = check_group(gg) if gg_check: # 获取该IP 原所属组名称 for data_gg in data: for i_gg in data_gg: for i,ip_data in enumerate(data_gg[i_gg]): if IP_modi == ip_data["ip"]: g = i_gg count = i ip = ip_data if gg == g: log.info("该IP主机:{},原已属于该组:{}".format(IP_modi,gg)) print("该IP主机:{},原已属于该组:{}".format(IP_modi,gg)) else: for x_data in data: for xi_gg in x_data: # 确定转入组相符 if xi_gg == gg: x_data[xi_gg].append(ip) # 删除原来所属组IP主机 elif xi_gg == g: x_data[xi_gg].remove(ip) with open(db_path,"wb") as fw: pickle.dump(data,fw) log.info("修改主机[%s]成功,新组名称为%s!"%(IP_modi,gg)) print("修改主机[%s]成功,新组名称为%s!"%(IP_modi,gg)) else: log.error("不存在此IP主机{}".format(IP_modi)) print("不存在此IP主机{}".format(IP_modi)) def cmd_handle(arg): """ 解析命令,并返回主机IP列表 :param arg: :return: """ if arg[0] == "*": ip_list = [] for g in data: for gg in g: for ip in g[gg]: ip_list.append(ip["ip"]) ip_list = list(set(ip_list)) # 去除重复IP return ip_list elif arg[0] == "-h": ip_list=[] ip_group = arg[1:] for data_ip in data: for ip in ip_group: for ip_data in data_ip: for iptest in data_ip[ip_data]: if ip in iptest["ip"]: ip_list.append(ip) ip_list =list(set(ip_list)) return ip_list elif arg[0] == "-g": ip_list = [] group_list =arg[1:] for group in group_list: for g in data: if group in g.keys(): for ip in g[group]: ip_list.append(ip["ip"]) ip_list = list(set(ip_list)) # 去除重复的IP return ip_list else: ip_list =[] return "" def ip_user(ip): """ 获取主机连接账号信息 :param ip: :return: """ ip_info = [] for data_ip in data: for ip_data in data_ip: for iptest in data_ip[ip_data]: if ip in iptest["ip"]: ip_info = [iptest["ip"],iptest["port"],iptest["username"],iptest["password"]] return ip_info def host_delete(): try: host_add = input("请要删除主机IP:").strip() # 判断是否为IP if<以上是关于python 主机宝的主要内容,如果未能解决你的问题,请参考以下文章
Android Jetpack导航,另一个主机片段内的主机片段