利用RabbitMQ实现RPC(python)
Posted 进击的大杂烩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用RabbitMQ实现RPC(python)相关的知识,希望对你有一定的参考价值。
RPC——远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式。RabbitMQ是一个消息队列系统,可以在程序之间收发消息。利用RabbitMQ可以实现RPC。本文所有操作都是在CentOS7.3上进行的,示例代码语言为Python。
RabbiMQ以及pika模块安装
yum install rabbitmq-server python-pika -y systemctl start rabbitmq-server |
RPC的基本实现
RPC的服务端代码如下:
#!/usr/bin/env python importpika
connection =pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel =connection.channel() channel.queue_declare(queue='rpc_queue')
deffun(n): return2*n
defon_request(ch, method, props, body): n =int(body) response = fun(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming() |
以上代码中,首先与RabbitMQ服务建立连接,然后定义了一个函数fun(),fun()功能很简单,输入一个数然后返回该数的两倍,这个函数就是我们要远程调用的函数。on_request()是一个回调函数,它作为参数传递给了basic_consume(),当basic_consume()在队列中消费1条消息时,on_request()就会被调用,on_request()从消息内容body中获取数字,并传给fun()进行计算,并将返回值作为消息内容发给调用方指定的接收队列,队列名称保存在变量props.reply_to中。
RPC的客户端代码如下:
#!/usr/bin/env python importpika importuuid
classRpcClient(object): def__init__(self): self.connection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel=self.connection.channel()
result =self.channel.queue_declare(exclusive=True) self.callback_queue=result.method.queue
self.channel.basic_consume(self.on_response,no_ack=True, queue=self.callback_queue)
defon_response(self,ch, method, props, body): ifself.corr_id==props.correlation_id: self.response= body
defcall(self,n): self.response=None self.corr_id=str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) whileself.responseisNone: self.connection.process_data_events() returnstr(self.response)
rpc=RpcClient()
print(" [x] Requesting") response =rpc.call(2) print(" [.] Got %r"% response) |
代码开始也是连接RabbitMQ,然后开始消费消息队列callback_queue中的消息,该队列的名字通过Request的属性reply_to传递给服务端,就是在上面介绍服务端代码时提到过的props.reply_to,作用是告诉服务端把结果发到这个队列。basic_consume()的回调函数变成了on_response(),这个函数从callback_queue的消息内容中获取返回结果。
函数call实际发起请求,把数字n发给服务端程序,当response不为空时,返回response值。
下面看运行效果,先启动服务端:
在另一个窗口中运行客户端:
成功调用了服务端的fun()并得到了正确结果(fun(2)结果为4)。
总结:RPC的实现过程可以用下图来表示(图片来自RabbitMQ官网):
当客户端启动时,它将创建一个callbackqueue用于接收服务端的返回消息Reply,名称由RabbitMQ自动生成,如上图中的amq.gen-Xa2..。同一个客户端可能会发出多个Request,这些Request的Reply都由callbackqueue接收,为了互相区分,就引入了correlation_id属性,每个请求的correlation_id值唯一。这样,客户端发起的Request就带由2个关键属性:reply_to告诉服务端向哪个队列返回结果;correlation_id用来区分是哪个Request的返回。
稍微复杂点的RPC
如果服务端定义了多个函数供远程调用怎么办?有两种思路,一种是利用Request的属性app_id传递函数名,另一种是把函数名通过消息内容发送给服务端。
我们先实现第一种,服务端代码如下:
#!/usr/bin/env python importpika
connection =pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel =connection.channel() channel.queue_declare(queue='rpc_queue')
defa(): return"a"
defb(): return"b"
defon_request(ch, method, props, body): funname=props.app_id iffunname=="a": response =a() eliffunname=="b": response =b()
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests") channel.start_consuming() |
这次我们定义了2个不同函数a()和b(),分别打印不同字符串,根据接收到的app_id来决定调用哪一个。
客户端代码:
#!/usr/bin/env python importpika importuuid
classRpcClient(object): def__init__(self): self.connection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel=self.connection.channel()
result =self.channel.queue_declare(exclusive=True) self.callback_queue=result.method.queue
self.channel.basic_consume(self.on_response,no_ack=True, queue=self.callback_queue)
defon_response(self,ch, method, props, body): ifself.corr_id==props.correlation_id: self.response= body
defcall(self,name): self.response=None self.corr_id=str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, app_id=str(name), ), body="request") whileself.responseisNone: self.connection.process_data_events() returnstr(self.response)
rpc=RpcClient()
print(" [x] Requesting") response =rpc.call("b") print(" [.] Got %r"% response) |
函数call()接收参数name作为被调用的远程函数的名字,通过app_id传给服务端程序,这段代码里我们选择调用服务端的函数b(),rpc.call(“b”)。
执行结果:
结果显示成功调用了函数b,如果改成rpc.call(“a”),执行结果就会变成:
第二种实现方法,服务端代码:
#!/usr/bin/env python importpika
connection =pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel =connection.channel() channel.queue_declare(queue='rpc_queue')
defa(): return"a"
defb(): return"b"
defon_request(ch, method, props, body): funname=str(body) iffunname=="a": response =a() eliffunname=="b": response =b()
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests") channel.start_consuming() |
客户端代码:
#!/usr/bin/env python importpika importuuid
classRpcClient(object): def__init__(self): self.connection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel=self.connection.channel()
result =self.channel.queue_declare(exclusive=True) self.callback_queue=result.method.queue
self.channel.basic_consume(self.on_response,no_ack=True, queue=self.callback_queue)
defon_response(self,ch, method, props, body): ifself.corr_id==props.correlation_id: self.response= body
defcall(self,name): self.response=None self.corr_id=str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(name)) whileself.responseisNone: self.connection.process_data_events() returnstr(self.response)
rpc=RpcClient()
print(" [x] Requesting") response =rpc.call("b") print(" [.] Got %r"% response) |
与第一种实现方法的区别就是没有使用属性app_id,而是把要调用的函数名放在消息内容body中,执行结果跟第一种方法一样。
一个简单的实际应用案例
下面我们将编写一个小程序,用于收集多台KVM宿主机上的虚拟机数量和剩余可使用的资源。程序由两部分组成,运行在每台宿主机上的脚本agent.py和管理机上收集信息的脚本collect.py。从RPC的角度,agent.py是服务端,collect.py是客户端。
agent.py代码如下:
#!/usr/bin/python importpika importlibvirt importpsutil importjson import socket importos import sys fromxml.domimportminidom
RabbitMQServer=x.x.x.x
#连接libvirt,libvirt是一个虚拟机、容器管理程序。 defget_conn(): conn =libvirt.open("qemu:///system") if conn ==None: print'--Failed to open connection to QEMU/KVM--' sys.exit(2) else: returnconn
#获取虚拟机数量 defgetVMcount(): conn =get_conn() domainIDs=conn.listDomainsID() returnlen(domainIDs)
#获取分配给所有虚拟机的内存之和 defgetMemoryused(): conn =get_conn() domainIDs=conn.listDomainsID() used_mem=0 for id indomainIDs: dom=conn.lookupByID(id) used_mem+=dom.maxMemory()/(1024*1024) returnused_mem
#获取分配给所有虚拟机的vcpu之和 defgetCPUused(): conn =get_conn() domainIDs=conn.listDomainsID() used_cpu=0 for id indomainIDs: dom=conn.lookupByID(id) used_cpu+=dom.maxVcpus() returnused_cpu
#获取所有虚拟机磁盘文件大小之和 defgetDiskused(): conn =get_conn() domainIDs=conn.listDomainsID() diskused=0 for id indomainIDs: dom=conn.lookupByID(id) xml =dom.XMLDesc(0) doc =minidom.parseString(xml) disks =doc.getElementsByTagName('disk') for disk in disks: ifdisk.getAttribute('device')=='disk': diskfile=disk.getElementsByTagName('source')[0].getAttribute('file') diskused+=dom.blockInfo(diskfile,0)[0]/(1024**3) returndiskused
#使agent.py进入守护进程模式 defdaemonize(stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'): try: pid=os.fork() ifpid>0: sys.exit(0) exceptOSError,e: sys.stderr.write("fork #1 failed: (%d) %s\n"%(e.errno,e.strerror)) sys.exit(1) os.chdir("/") os.umask(0) os.setsid() try: pid=os.fork() ifpid>0: sys.exit(0) exceptOSError,e: sys.stderr.write("fork #2 failed: (%d) %s\n"%(e.errno,e.strerror)) sys.exit(1) for f insys.stdout,sys.stderr,:f.flush() si= file(stdin,'r') so = file(stdout,'a+',0) se = file(stderr,'a+',0) os.dup2(si.fileno(),sys.stdin.fileno()) os.dup2(so.fileno(),sys.stdout.fileno()) os.dup2(se.fileno(),sys.stderr.fileno())
daemonize('/dev/null','/root/kvm/agent.log','/root/kvm/agent.log')
#连接RabbitMQ connection =pika.BlockingConnection(pika.ConnectionParameters(host=RabbitMQServer)) channel =connection.channel() channel.exchange_declare(exchange='kvm',type='fanout') result =channel.queue_declare(exclusive=True) queue_name=result.method.queue channel.queue_bind(exchange='kvm',queue=queue_name)
defon_request(ch,method,props,body): sys.stdout.write(body+'\n') sys.stdout.flush() mem_total=psutil.virtual_memory()[0]/(1024*1024*1024) cpu_total=psutil.cpu_count() statvfs=os.statvfs('/datapool') disk_total=(statvfs.f_frsize*statvfs.f_blocks)/(1024**3) mem_unused=mem_total-getMemoryused() cpu_unused=cpu_total-getCPUused() disk_unused=disk_total-getDiskused() data ={ 'hostname':socket.gethostname(),#宿主机名 'vm':getVMcount(),#虚拟机数量 'available memory':mem_unused,#可用内存 'available cpu':cpu_unused,#可用cpu核数 'available disk':disk_unused#可用磁盘空间 } json_str=json.dumps(data) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=json_str ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request,queue=queue_name) sys.stdout.write(" [x] Awaiting RPC requests\n") sys.stdout.flush() channel.start_consuming() |
collect.py代码如下:
#!/usr/bin/python importpika importuuid importjson import datetime
RabbitMQServer=x.x.x.x classRpcClient(object): def__init__(self): self.connection= pika.BlockingConnection(pika.ConnectionParameters(host=RabbitMQServer)) self.channel=self.connection.channel() self.channel.exchange_declare(exchange='kvm',type='fanout') result =self.channel.queue_declare(exclusive=True) self.callback_queue=result.method.queue self.channel.basic_consume(self.on_responses,no_ack=True,queue=self.callback_queue) self.responses=[]
defon_responses(self,ch,method,props,body): ifself.corr_id==props.correlation_id: self.responses.append(body)
defcall(self): timestamp =datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%dT%H:%M:%SZ') self.corr_id=str(uuid.uuid4()) self.channel.basic_publish(exchange='kvm', routing_key='', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body='%s: receive a request'% timestamp ) #定义超时回调函数 defoutoftime(): self.channel.stop_consuming() self.connection.add_timeout(30,outoftime) self.channel.start_consuming() returnself.responses
rpc=RpcClient() responses =rpc.call() foriin responses: response =json.loads(i) print(" [.] Got %r"% response) |
本文在前面演示的RPC都是只有一个服务端的情况,客户端发起请求后是用一个while循环来阻塞程序以等待返回结果的,当self.response不为None,就退出循环。
如果在多服务端的情况下照搬过来就会出问题,实际情况中我们可能有几十台宿主机,每台上面都运行了一个agent.py,当collect.py向几十个agent.py发起请求时,收到第一个宿主机的返回结果后就会退出上述while循环,导致后续其他宿主机的返回结果被丢弃。这里我选择定义了一个超时回调函数outoftime()来替代之前的while循环,超时时间设为30秒。collect.py发起请求后阻塞30秒来等待所有宿主机的回应。如果宿主机数量特别多,可以再调大超时时间。
脚本运行需要使用的模块pika和psutil安装过程:
yum install -y python-pip python-devel pip install pika wget--no-check-certificate https://pypi.python.org/packages/source/p/psutil/psutil-2.1.3.tar.gz tarzxvf psutil-2.1.3.tar.gz cd psutil-2.1.3/&&python setup.py install |
脚本运行效果演示:
以上是关于利用RabbitMQ实现RPC(python)的主要内容,如果未能解决你的问题,请参考以下文章
Python开发项目:RPC异步执行命令(RabbitMQ双向通信)