RabbitMQ4--发后即忘和RPC

Posted 读书使人进步

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ4--发后即忘和RPC相关的知识,希望对你有一定的参考价值。

  在项目中引入RabbitMQ通常会考虑它会带来的好处:解耦应用程序,实现不同编程语言之间的互通,解除对特定通信协议的依赖,解除应用程序在时序上执行的依赖(异步).落实到代码层面就是两种常用应用模式:"发后即忘"(fire-and-forget)和RPC.

fire-and-forget 

  RabbitMQ 解决的是应用程序之间互联(connect)和规模(scale)的问题,消息发送和接收是隔离,发送方不知道消息最终由谁接收,接收方也不必关心消息是谁步发出的;发送和接收是隔离的,消息本质上就是异步的.这种隔离也就解耦了应用程序之间的依赖.RabbitMQ的角色就是应用程序中间的路由器.对于消息的发布方来讲这是一种"发后即忘"(fire-and_forget)的发布方式.

RPC

  RPC需要双向通信,或者说RPC Server需要明确知道要把消息发送给谁.我们可以在payload的数据部分附加"发给谁" 这种EndPoint信息. RabbitMQ提供的解决方案:在每一个AMQP的消息头上有一个reply_to字段.这样消息的producer就可以指定Queue name,RPC Server接受到消息检查reply_to字段,创建一个消息包含Response并把queue name作为routing key,订阅了这个队列的Client就拿到了消息.

  这里有两件事情要保证:

  1. 要为队列创建随机Name
  2. 即使Name随机还是有可能冲突,还需要保证消息通信的独占性。

  看看RabbitMQ是怎么满足这两点的:

  1. 如果创建的队列不指定queue name,RabbitMQ就会创建一个随机的Name.
  2.  独占只需要exclusive参数即可

  总而言之,需要做的就是Client创建一个temporary,exclusive,anonymou的queue,并把queue name设置在RPC 消息的reply_to字段即可.注意这里RPC Server已经知道要投递到哪个Queue,所以不需要指定Exchange(后面我们会提到在实现层面Queue和Exchange的不同,简单讲 queue会有对应的Erlang进程,而exchang只是执行一些模式匹配的检查并没有进程实体对应).看下图:

  Our RPC will work like this:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application

 RPC Client端的代码如下:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

public class RPCClient {
  private Connection connection;
  private Channel channel;
  private String requestQueueName = "rpc_queue";
  private String replyQueueName;
  private QueueingConsumer consumer;

  public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
  }

  public String call(String message) throws Exception {
    String response = null;
    String corrId = UUID.randomUUID().toString();
    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      if (delivery.getProperties().getCorrelationId().equals(corrId)) {
        response = new String(delivery.getBody(),"UTF-8");
        break;
      }
    }
    return response;
  }

  public void close() throws Exception {
    connection.close();
  }

  public static void main(String[] argv) {
    RPCClient fibonacciRpc = null;
    String response = null;
    try {
      fibonacciRpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");
      response = fibonacciRpc.call("30");
            System.out.println(" [.] Got \'" + response + "\'");
        }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (fibonacciRpc!= null) {
        try {
          fibonacciRpc.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}
View Code

RPC SEVER端代码如下:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    //计算斐波切纳数列
    private static int fib(int n) {
        if (n == 0)
            return 0;
        if (n == 1)
            return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);

            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            System.out.println(" [x] Awaiting RPC requests");

            while (true) {
                String response = null;
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties.Builder()
                        .correlationId(props.getCorrelationId()).build();

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);
                    System.out.println(" [.] fib(" + message + ")");
                    response = "" + fib(n);
                } catch (Exception e) {
                    System.out.println(" [.] " + e.toString());
                    response = "";
                } finally {
                    channel.basicPublish("", props.getReplyTo(), replyProps,
                            response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
                            false);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}
View Code

略有不同

  传统的RPC调用Client和Server紧密依赖,客户端连接上服务器,发送一个请求然后阻塞等待服务器响应.这样的做的特点是客户端和服务器端是知道对方的.如果RPC Server崩溃掉,客户端需要重连,如果Server彻底崩掉就要重新找一个提供同样服务的Server,然后客户端重连过去.

  用RabbitMQ来实现RPC,依然保持Client Server信息隐藏的特点,Client依赖的不是特定的Server而是特定的消息,在有多个等效Server的情况下,一个Server的状态是否正常不会影响到客户端的状态.

  总结一下,使用RabbitMQ是先RPC,客观上还实现了下面的效果:

  1. 容错 一个Server崩溃不影响 Client
  2. 解耦了对特定通信协议和接口的依赖,统一走AMQP消息.
  3. 在多个RPC Server之间的负载均衡由RabbitMQ完成

以上是关于RabbitMQ4--发后即忘和RPC的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ实战:消息通信模式和最佳实践

Kafka 消息丢失与消费精确一次性

Kafka 消息丢失与消费精确一次性

kafka传递消息的三种方式

跨页回发后Control.UniqueID不同

RadDatePicker 和 RadTimePicker 在回发后失去样式