要求:
可以异步的执行多个命令
对多台机器
>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:
思考:
1、分解其中需要实现的功能
(1)命令是发到远程主机上执行的,命令放在队列里,再发到主机处理,主机执行完结果放在队列里,提交命令的人自取。
就需要2个进程,一个client,提交命令,取结果,一个server,处理命令,放结果
(2)发送命令的时候,exchange决定往哪个队列放消息,每个server取自己的命令,用ip作为筛选的binding_key
(3)取结果的时候,就用默认的exchange,直接往reply_to的队列里放
server端
1 #rabbitMQserver=‘10.21.147.189‘ 2 rabbitMQserver=‘localhost‘ 3 4 import pika 5 import os 6 import socket 7 8 class server(object): 9 def __init__(self): 10 self.connection=pika.BlockingConnection(pika.ConnectionParameters(rabbitMQserver)) 11 self.channel=self.connection.channel() 12 self.channel.exchange_declare(exchange=‘cmd‘,exchange_type=‘topic‘) 13 self.queue_default = self.channel.queue_declare(exclusive=True) 14 self.quname = self.queue_default.method.queue 15 # 获取本机ip作为binding_key,binding_key的格式是以点分隔的一系列字符串,client发过来的消息,routing_key满足server的binding_key匹配原则,就会加入server连的那个queue 16 self.hostip=socket.gethostbyname(socket.gethostname()) 17 self.binding_key=‘#.‘+self.hostip+‘.#‘ 18 print("binding_key: %s" % self.binding_key) 19 self.channel.queue_bind(queue=self.quname, exchange=‘cmd‘, routing_key=self.binding_key) 20 self.channel.basic_qos(prefetch_count=1) 21 22 self.channel.basic_consume(self.execcmd,queue=self.quname,no_ack=False) 23 self.channel.start_consuming() 24 return 25 26 def execcmd(self,ch,method,props,body): 27 cmd=bytes.decode(body) 28 print("[*] Received %s" % cmd) 29 result = os.popen(cmd).read() 30 print(result) 31 ch.basic_publish( 32 exchange=‘‘, 33 routing_key=props.reply_to, 34 properties=pika.BasicProperties( 35 correlation_id=props.correlation_id 36 ), 37 body=result 38 ) 39 ch.basic_ack(delivery_tag=method.delivery_tag) 40 return 41 42 se=server()
client端
1 import pika 2 import uuid 3 4 #rabbitMQserver=‘10.21.147.189‘ 5 rabbitMQserver=‘localhost‘ 6 7 class client(object): 8 def __init__(self): 9 self.cmdid={} 10 self.connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitMQserver)) 11 self.channel = self.connection.channel() 12 self.channel.exchange_declare(exchange=‘cmd‘, exchange_type=‘topic‘) 13 14 #回写result的队列用默认生成的,需要取得名字,exclusive定为True表示只能有本client消费这个queue 15 self.result=self.channel.queue_declare(exclusive=True) 16 self.resultqueue=self.result.method.queue 17 return 18 19 def showcmdid(self): 20 print("-----命令标识如下:--------") 21 for id,hosts in self.cmdid.items(): 22 print("cmdid:%s run on hosts: %s"%(id,hosts)) 23 return 24 25 def callcmd(self,cmdinput): 26 #如果是call命令,格式是 call+"命令"+ --host + 一串ip地址 27 #首先判断格式对不对,至少4个参数 28 msglist=cmdinput.split() 29 argnum=len(msglist) 30 if argnum<4: 31 print("Wrong cmd. Input again.") 32 return 33 #其次,命令是第二个参数,命令用双引号标记,所以要strip 34 msg = msglist[1].strip("\"") 35 #然后,第三个是--host,第四个开始是ip,ip当作routing_key 36 routing_key=msglist[3] 37 i=4 38 while i < argnum: 39 routing_key=routing_key+‘.‘+msglist[i] 40 i+=1 41 print("routing_key: %s"%routing_key) 42 #再然后,生成一个随机数,把他作为消息的属性参数 43 self.corr_id=str(uuid.uuid4()) 44 self.cmdid[self.corr_id]=msglist[3:] 45 print("命令标识:%s"%self.corr_id) 46 #然后,把消息发到exchange,routing_key,corr_id当作参数发布 47 self.channel.basic_publish( 48 exchange=‘cmd‘, 49 routing_key=routing_key, 50 body=msg, 51 properties=pika.BasicProperties( 52 reply_to=self.resultqueue, 53 correlation_id=self.corr_id 54 ) 55 ) 56 print("[*] Send message %s" % msg) 57 return 58 59 def on_response(self,ch,method,props,body): 60 if self.targetcmdid==props.correlation_id: 61 self.response=bytes.decode(body) 62 print(self.response) 63 ch.basic_ack(delivery_tag=method.delivery_tag) 64 return 65 66 def getres(self,cmdinput): 67 msglist = cmdinput.split() 68 self.targetcmdid=msglist[1] 69 self.response=None 70 self.channel.basic_consume(self.on_response,queue=self.resultqueue) 71 while self.response is None: 72 self.connection.process_data_events() 73 return 74 75 cl=client() 76 while True: 77 msginput=input(">>: ").strip() 78 if msginput.startswith(‘call‘): 79 cl.callcmd(msginput) 80 elif msginput.startswith(‘get‘): 81 cl.getres(msginput) 82 elif msginput.startswith(‘show‘): 83 cl.showcmdid() 84 elif msginput.startswith(‘exit‘): 85 cl.connection.close() 86 exit(0) 87 else: 88 print("Wrong cmd. Input again.") 89 continue
目前没有验证远程登陆rabbitMQ server的情况,应该是需要配置用户名密码,不能用默认的guest/guest。不过以上功能是实现了。