python项目开发:用RabbitMQ实现异步RPC
Posted 野生的马
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python项目开发:用RabbitMQ实现异步RPC相关的知识,希望对你有一定的参考价值。
程序要求:
1. 用Rabbit MQ实现RPC
1. 可以异步地执行多条命令
2. 可以对一次性对多个机器执行命令
程序效果:
---》run dir host1 host2 。。。。
---》get task_id
---》taskId:xxxx host: xxxxxx
---》check task_id
--->打印结果
程序分析:
为了达到异步地效果,可以使用多线程或协程,即每执行一条命令就启动一条线程或协程。客户端发送命令到队列、从返回队列接收结果分离,不能写到一起。
业务逻辑:
代码实现:
README
#author:Wu zhiHao #博客地址:https://www.cnblogs.com/BUPT-MrWu/p/10364619.html #程序目录框架: |--RPC |--RPC_server #服务端 |--bin |--start.py #程序入口 |--core |--RpcServer.py #服务端主要逻辑 |--RPC_client #客户端 |--bin |--start.py #程序入口 |--core |--main.py #程序主要逻辑 |--modules |--RpcClient.py #客户端主要逻辑 |--conf |--settings.py #配置文件 |--READ_ME #命令格式: 1. run command host1 host2..... #执行命令 2. all_task #获取全部task_id 3. check task_id #获取命令结果
RPC_server\\\\bin\\\\start.py
import sys,os BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) sys.path.append(BASE_dir) from core import RpcServer if __name__ == \'__main__\': obj = RpcServer.RpcServer() obj.channel.start_consuming()
RPC_server\\\\core\\\\RpcServer.py
import pika import os import socket from conf import settings class RpcServer(object): def __init__(self): self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证 self.connection = pika.BlockingConnection( pika.ConnectionParameters( settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials, ) ) self.My_Ip = self.get_ip() #获取服务端IP地址 self.channel = self.connection.channel() self.result = self.channel.queue_declare(exclusive=True) self.queue_name = self.result.method.queue self.channel.exchange_declare( exchange="Rpc", exchange_type="direct", ) self.channel.queue_bind( exchange="Rpc", queue=self.queue_name, routing_key=self.My_Ip, ) self.channel.basic_consume( self.on_response, queue=self.queue_name, ) def on_response(self,ch,method,properties,body): command = body.decode() command_result = self.on_request(command) self.channel.basic_publish( exchange="", routing_key=properties.reply_to, properties=pika.BasicProperties( correlation_id=properties.correlation_id, ), body=command_result ) def on_request(self,command): return os.popen(command).read() def get_ip(self): computer_name = socket.getfqdn(socket.gethostname( )) computer_Ip = socket.gethostbyname(computer_name) return computer_Ip
RPC_client\\\\bin\\\\start.py
import sys,os BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) sys.path.append(BASE_dir) from core import main if __name__ == \'__main__\': obj = main.run() obj.start()
RPC_client\\\\core\\\\main.py
import random import threading from modules import RpcClient class run(object): def __init__(self): self.client = RpcClient.RpcClient() self.information = {} def start(self): while True: try: command = input("-->") if not command:continue t = threading.Thread(target=self.select,args=(command,)) t.start() except Exception as e: print(e) def select(self,command): \'\'\'解析命令\'\'\' try: keyword = command.split()[0] func = getattr(self,keyword) func(command) except Exception as e: print(e) def run(self,command): \'\'\'执行命令\'\'\' try: task_id = str(random.randint(100,1000)) self.information[task_id] = {} keyword = command.split()[1] for host in command.split()[2:]: result = self.client.on_request(host,keyword) self.information[task_id][host] = [result[0],result[1]] except Exception as e: print(e) def check(self,command): \'\'\'获取命令结果\'\'\' try: task_id = command.split()[1] for host in self.information[task_id]: corr_id = self.information[task_id][host][0] callback_queue = self.information[task_id][host][1] command_result = self.client.get_response(corr_id,callback_queue) print("%s:\\n%s"%(host,command_result)) self.information.pop(task_id) #删除task_id except Exception as e: print(e) def all_task(self,command): \'\'\'获取全部task_id\'\'\' try: for task_id in self.information: all_host = [] for host in self.information[task_id]: all_host.append(host) print("task_id: %s host: %s\\n"%(task_id,all_host)) except Exception as e: print(e)
RPC_client\\\\conf\\\\settings.py
RabbitMq_name = "XXX" #RabbitMq用户名 RabbitMq_password = "XXX" #rabbitmq用户密码 RabbitMq_ip = "XXX" #RabbitMq端的IP地址 RabbitMq_port = 5672 #RabbitMq端的端口号
RPC_client\\\\mudules\\\\RpcClient.py
import pika import uuid from conf import settings class RpcClient(object): def __init__(self): self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证 self.connection = pika.BlockingConnection( pika.ConnectionParameters( settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials, ) ) self.channel = self.connection.channel() def get_response(self,corr_id,callback_queue): \'\'\'从队列里取值\'\'\' self.corr_id = corr_id self.response = None self.channel.basic_consume( self.on_response, queue=callback_queue, ) while self.response is None: self.connection.process_data_events() #非阻塞版的start_consuming return self.response def on_response(self,ch,method,properties,body): \'\'\'当队列里有数据时执行\'\'\' if self.corr_id == properties.correlation_id: self.response = body.decode() def on_request(self,host,command): \'\'\'发送命令\'\'\' result = self.channel.queue_declare(exclusive=False) #生成另一个queue时,这个queue不会消失 callback_queue = result.method.queue #返回queue corr_id = str(uuid.uuid4()) #验证码 self.channel.exchange_declare( exchange="Rpc", exchange_type="direct" ) self.channel.basic_publish( exchange="Rpc", routing_key=host, properties=pika.BasicProperties( correlation_id=corr_id, reply_to=callback_queue, ), body=command, ) return corr_id,callback_queue #返回验证值和返回queue
程序执行实例:
以上是关于python项目开发:用RabbitMQ实现异步RPC的主要内容,如果未能解决你的问题,请参考以下文章
Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控
Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控
python测试开发django-159.Celery 异步与 RabbitMQ 环境搭建