python项目开发:用RabbitMQ实现异步RPC

Posted 野生的马

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python项目开发:用RabbitMQ实现异步RPC相关的知识,希望对你有一定的参考价值。

程序要求:

              1. 用Rabbit MQ实现RPC

              1. 可以异步地执行多条命令

              2. 可以对一次性对多个机器执行命令

程序效果:

              ---》run dir host1 host2 。。。。

              ---》get task_id

              ---》taskId:xxxx   host: xxxxxx

              ---》check task_id

              --->打印结果

程序分析:

              为了达到异步地效果,可以使用多线程或协程,即每执行一条命令就启动一条线程或协程。客户端发送命令到队列、从返回队列接收结果分离,不能写到一起。

业务逻辑:

代码实现:

README

#author:Wu zhiHao

#博客地址:https://www.cnblogs.com/BUPT-MrWu/p/10364619.html

#程序目录框架:
 |--RPC
  |--RPC_server #服务端
   |--bin
    |--start.py #程序入口
   |--core
    |--RpcServer.py #服务端主要逻辑
  |--RPC_client #客户端
   |--bin
    |--start.py #程序入口
   |--core
    |--main.py #程序主要逻辑
   |--modules
    |--RpcClient.py #客户端主要逻辑
   |--conf
    |--settings.py #配置文件
  |--READ_ME

#命令格式:
 1. run command host1 host2..... #执行命令
 2. all_task #获取全部task_id
 3. check task_id #获取命令结果
View Code

RPC_server\\\\bin\\\\start.py

import sys,os
BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(BASE_dir)
from core import RpcServer
if __name__ == \'__main__\':
    obj = RpcServer.RpcServer()
    obj.channel.start_consuming()
View Code

RPC_server\\\\core\\\\RpcServer.py

import pika
import os
import socket
from conf import settings
class RpcServer(object):
    def __init__(self):
        self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,
            )
        )
        self.My_Ip = self.get_ip() #获取服务端IP地址
        self.channel = self.connection.channel()
        self.result = self.channel.queue_declare(exclusive=True)
        self.queue_name = self.result.method.queue
        self.channel.exchange_declare(
            exchange="Rpc",
            exchange_type="direct",
        )
        self.channel.queue_bind(
            exchange="Rpc",
            queue=self.queue_name,
            routing_key=self.My_Ip,
        )
        self.channel.basic_consume(
            self.on_response,
            queue=self.queue_name,
        )

    def on_response(self,ch,method,properties,body):
        command = body.decode()
        command_result = self.on_request(command)
        self.channel.basic_publish(
            exchange="",
            routing_key=properties.reply_to,
            properties=pika.BasicProperties(
                correlation_id=properties.correlation_id,
            ),
            body=command_result
        )


    def on_request(self,command):
        return os.popen(command).read()

    def get_ip(self):
        computer_name = socket.getfqdn(socket.gethostname(  ))
        computer_Ip = socket.gethostbyname(computer_name)
        return computer_Ip
View Code

RPC_client\\\\bin\\\\start.py

import sys,os

BASE_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(BASE_dir)
from core import main
if __name__ == \'__main__\':
    obj = main.run()
    obj.start()
View Code

RPC_client\\\\core\\\\main.py

import random
import threading
from modules import RpcClient

class run(object):
    def __init__(self):
        self.client = RpcClient.RpcClient()
        self.information = {}

    def start(self):
        while True:
            try:
                command = input("-->")
                if not command:continue
                t = threading.Thread(target=self.select,args=(command,))
                t.start()
            except Exception as e:
                print(e)

    def select(self,command):
        \'\'\'解析命令\'\'\'
        try:
            keyword = command.split()[0]
            func = getattr(self,keyword)
            func(command)
        except Exception as e:
            print(e)

    def run(self,command):
        \'\'\'执行命令\'\'\'
        try:
            task_id = str(random.randint(100,1000))
            self.information[task_id] = {}
            keyword = command.split()[1]
            for host in command.split()[2:]:
                result = self.client.on_request(host,keyword)
                self.information[task_id][host] = [result[0],result[1]]
        except Exception as e:
            print(e)

    def check(self,command):
        \'\'\'获取命令结果\'\'\'
        try:
            task_id = command.split()[1]
            for host in self.information[task_id]:
                corr_id = self.information[task_id][host][0]
                callback_queue = self.information[task_id][host][1]
                command_result = self.client.get_response(corr_id,callback_queue)
                print("%s:\\n%s"%(host,command_result))
            self.information.pop(task_id) #删除task_id
        except Exception as e:
            print(e)

    def all_task(self,command):
        \'\'\'获取全部task_id\'\'\'
        try:
            for task_id in self.information:
                all_host = []
                for host in self.information[task_id]:
                    all_host.append(host)
                print("task_id: %s  host: %s\\n"%(task_id,all_host))
        except Exception as e:
            print(e)
View Code

RPC_client\\\\conf\\\\settings.py

RabbitMq_name = "XXX" #RabbitMq用户名
RabbitMq_password = "XXX" #rabbitmq用户密码
RabbitMq_ip = "XXX" #RabbitMq端的IP地址
RabbitMq_port = 5672 #RabbitMq端的端口号
View Code

RPC_client\\\\mudules\\\\RpcClient.py

import pika
import uuid
from conf import settings
class RpcClient(object):
    def __init__(self):
        self.credentials = pika.PlainCredentials(settings.RabbitMq_name,settings.RabbitMq_password) #RabbiMQ用户认证
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                settings.RabbitMq_ip,settings.RabbitMq_port,"/",self.credentials,
            )
        )
        self.channel = self.connection.channel()

    def get_response(self,corr_id,callback_queue):
        \'\'\'从队列里取值\'\'\'
        self.corr_id = corr_id
        self.response = None
        self.channel.basic_consume(
            self.on_response,
            queue=callback_queue,
        )
        while self.response is None:
            self.connection.process_data_events() #非阻塞版的start_consuming
        return self.response

    def on_response(self,ch,method,properties,body):
        \'\'\'当队列里有数据时执行\'\'\'
        if self.corr_id == properties.correlation_id:
            self.response = body.decode()

    def on_request(self,host,command):
        \'\'\'发送命令\'\'\'
        result = self.channel.queue_declare(exclusive=False) #生成另一个queue时,这个queue不会消失
        callback_queue = result.method.queue #返回queue
        corr_id = str(uuid.uuid4()) #验证码
        self.channel.exchange_declare(
            exchange="Rpc",
            exchange_type="direct"
        )
        self.channel.basic_publish(
            exchange="Rpc",
            routing_key=host,
            properties=pika.BasicProperties(
                correlation_id=corr_id,
                reply_to=callback_queue,
            ),
            body=command,
        )
        return corr_id,callback_queue #返回验证值和返回queue
View Code

程序执行实例:

以上是关于python项目开发:用RabbitMQ实现异步RPC的主要内容,如果未能解决你的问题,请参考以下文章

Celery+Rabbitmq实现异步任务

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

python测试开发django-159.Celery 异步与 RabbitMQ 环境搭建

Celery异步任务队列/周期任务+ RabbitMQ + Django

用 Python RabbitMQ 和 Nameko 实现微服务