RabbitMQ之工作队列

Posted zengnansheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ之工作队列相关的知识,希望对你有一定的参考价值。

在这一篇,我们将创建一个工作队列,将队列的消息让多个消费者来接收。一个消息对应一个消费者接收!

 

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Producer {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws java.io.IOException, Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        // 分发消息
        for (int i = 1; i <= 5; i++) {
            String message = "Hello World! " + i;
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("P Sent ‘" + message + "‘");
        }
        channel.close();
        connection.close();
    }
}

 

 

2个消费者

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer1 {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Consumer1 Waiting for messages...");
        // 每次从队列中获取数量
        channel.basicQos(1);

        final DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println("Consumer1 Received ‘" + message + "‘");
                try {
                    try {
                        Thread.sleep(1000); // 暂停1秒钟
                    } catch (InterruptedException _ignored) {
                        Thread.currentThread().interrupt();
                    }
                } finally {
                    System.out.println("Consumer1 Done");
                    // 消息处理完成确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 消息消费完成确认
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }
}
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer2 {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Consumer2 Waiting for messages...");
        // 每次从队列中获取数量
        channel.basicQos(1);

        final DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println("Consumer2 Received ‘" + message + "‘");
                try {
                    try {
                        Thread.sleep(1000); // 暂停1秒钟
                    } catch (InterruptedException _ignored) {
                        Thread.currentThread().interrupt();
                    }
                } finally {
                    System.out.println("Consumer2 Done");
                    // 消息处理完成确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 消息消费完成确认
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }
}

 

启动RabbitMQ服务

 

启动消费者Consumer1和Consumer2

 

启动生产者Producer

 

 

可以看到打印日志信息

 

生产者Producer

P Sent ‘Hello World! 1‘

P Sent ‘Hello World! 2‘

P Sent ‘Hello World! 3‘

P Sent ‘Hello World! 4‘

P Sent ‘Hello World! 5‘

 

 

消费者Consumer1

Consumer1 Waiting for messages...

Consumer1  Received ‘Hello World! 1‘

Consumer1  Done

Consumer1  Received ‘Hello World! 3‘

Consumer1  Done

Consumer1  Received ‘Hello World! 5‘

Consumer1  Done

 

 

 

消费者Consumer2

Consumer2  Waiting for messages...

Consumer2  Received ‘Hello World! 2‘

Consumer2  Done

Consumer2  Received ‘Hello World! 4‘

Consumer2  Done

 

 

 

消息确认

 

如果处理一条消息需要几秒钟的时间,你可能会想,如果在处理消息的过程中,消费者服务器、网络、网卡出现故障挂了,那可能这条正在处理的消息或者任务就没有完成,就会失去这个消息和任务。 

为了确保消息或者任务不会丢失,RabbitMQ支持消息确认–ACK。ACK机制是消费者端从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。如果一个消费者在处理消息时挂掉(网络不稳定、服务器异常、网站故障等原因导致频道、连接关闭或者TCP连接丢失等),那么他就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中。如果有其他消费者同时在线,RabbitMQ会立即将这个消息推送给这个在线的消费者。这种机制保证了在消费者服务器故障的时候,能不丢失任何消息和任务。 

如果RabbitMQ向消费者发送消息时,消费者服务器挂了,消息也不会有超时;即使一个消息需要非常长的时间处理,也不会导致消息超时。这样消息永远不会从RabbitMQ服务器中删除。只有当消费者正确的发送ACK确认反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。 

消息的ACK确认机制默认是打开的。在上面的代码中,我们显示返回autoAck=true 这个标签。

 

忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。 

如果要监控这种错误,可以使用rabbitmqctl messages_unacknowledged命令打印出出相关的信息。

 

消息持久化

 

当RabbitMQ服务器挂了,她可能就丢失所有队列中的消息和任务。如果你想让RabbitMQ记住她当前的状态和内容,就需要通过2件事来确保消息和任务不会丢失。 

1 在队列声明时,告诉RabbitMQ,这个队列需要持久化:

boolean durable = true;

channel.queueDeclare("hello", durable, false, false, null);

2 重新声明另一个名称的队列,不过这需要修改生产者和消费者的代码,所以,在开发时,最好是将队列名称放到配置文件中。 

 

 

标记为持久化后的消息也不能完全保证不会丢失。虽然已经告诉RabbitMQ消息要保存到磁盘上,

但是理论上,RabbitMQ已经接收到生产者的消息,但是还没有来得及保存到磁盘上,服务器就挂了(比如机房断电),

那么重启后,RabbitMQ中的这条未及时保存的消息就会丢失。因为RabbitMQ不做实时立即的磁盘同步(fsync)。

这种情况下,对于持久化要求不是特别高的简单任务队列来说,还是可以满足的。

如果需要更强大的保证,那么你可以考虑使用生产者确认反馈机制。

 

负载均衡

 

默认情况下,RabbitMQ将队列消息随机分配给每个消费者,这时可能出现消息调度不均衡的问题。

例如有两台消费者服务器,一个服务器可能非常繁忙,消息不断,另外一个却很悠闲,没有什么负载。

RabbitMQ不会主动介入这些情况,还是会随机调度消息到每台服务器。 

这是因为RabbitMQ此时只负责调度消息,不会根据ACK的反馈机制来分析那台服务器返回反馈慢,是不是处理不过来

 

为了解决这个问题,我们可以使用【prefetchcount = 1】这个设置。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。

这样做的好处是只有当消费者处理完成当前消息并反馈后,才会收到另外一条消息或任务。这样就避免了负载不均衡的事情了。

 

int prefetchCount = 1;

channel.basicQos(prefetchCount);

 

如果所有的消费者负载都很高,你的队列很可能会被塞满。这时你需要增加更多的消费者或者其他方案。

 

以上是关于RabbitMQ之工作队列的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq系列二 之工作队列

RabbitMQ指南之二:工作队列(Work Queues)

RabbitMq之工作队列(轮询发送消息)

RabbitMQ指南之二:工作队列(Work Queues)

RabbitMQ入门工作队列

RabbitMQ : 订阅者模式之分发模式 ( fanout )