rabbitmq学习:利用rabbitmq实现远程rpc调用

Posted wutianqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq学习:利用rabbitmq实现远程rpc调用相关的知识,希望对你有一定的参考价值。

一、rabbitmq实现rpc调用的原理

·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识。服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中。原理图如下:  

  技术分享图片

二、代码实现

  下面我们将模拟实现一个rpc客户端和rpc服务端。客户端给服务端发送message,服务端收到后处理message,再将处理后的消息返给客户端

  rpc客户端

  

/**
 * rpc客户端
 */
public class RpcClient {
    //发送消息的队列名称
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
           connection = connectionFactory.newConnection();
           channel = connection.createChannel();
           //创建回调队列
           String callbackQueue = channel.queueDeclare().getQueue();
           //创建回调队列,消费者从回调队列中接收服务端传送的消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(callbackQueue,true,consumer);

            //创建消息带有correlationId的消息属性
            String correlationId = UUID.randomUUID().toString();
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build();
            String message = "hello rabbitmq";
            channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes());
            System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId);

            //接收回调消息
            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String receivedCorrelationId = delivery.getProperties().getCorrelationId();
                if(correlationId.equals(receivedCorrelationId)){
                    System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId);
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

 

   rpc服务端

  

/**
 * rpc服务器
 */
public class RpcServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static String format(String message){
        return "......" + message + "......";
    }

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //声明消费者预取的消息数量
            channel.basicQos(1);
            channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//采用手动回复消息
            System.out.println("RpcServer waitting for receive message");

            while (true){
                //接收并处理消息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("RpcServer receive message " + message);
                String response = format(message);
                //确认收到消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

                //取出消息的correlationId
                AMQP.BasicProperties properties = delivery.getProperties();
                String correlationId = properties.getCorrelationId();

                //创建具有与接收消息相同的correlationId的消息属性
                AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build();
                channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes());
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

   先运行服务端,再运行客户端,结果如下:

  RpcClient

  技术分享图片

  RpcServer

  技术分享图片

  

 


以上是关于rabbitmq学习:利用rabbitmq实现远程rpc调用的主要内容,如果未能解决你的问题,请参考以下文章

利用RabbitMQ实现RPC(python)

利用RabbitMQ实现RPC(python)

RPC使用rabbitmq实现

rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机

C#利用RabbitMQ实现点对点消息传输

RabbitMQ学习 (远程过程调用(RPC))