rabbitmq系列二 之工作队列

Posted hxinguan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq系列二 之工作队列相关的知识,希望对你有一定的参考价值。

---恢复内容开始---

1、工作队列的简介

  在上一篇中,我们已经写了一个从已知队列中发送和获取消息的程序,在这里,我们创建一个工作队列(work queue), 会发送一些耗时的任务给多个工作者。模型图如下:

                      技术分享图片

  工作队列,由称为任务队列(task queue), 主要是为了避免一些占用大量资源,时间的操作。当我们把任务(task)当作消息发送到队列时, 一个运行在后台的工作者(worker),当你运行多个工作者,任务就会在它们之间共享。

  这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

2、准备

  之前的教程中,我们发送了一个包含“Hello World!”的字符串消息。现在,我们将发送一些字符串,把这些字符串当作复杂的任务。我们没有真实的例子,例如图片缩放、pdf文件转换。所以使用time.sleep()函数来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如"Hello..."就会耗时3秒钟。

  我们对之前教程的send.java做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中。我们把它命名为NewTask.java,代码如下:

技术分享图片
 1 package rabbitmq.main;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 
 9 import rabbitmq.utils.ConnectionUtils;
10 
11 public class NewTask {
12 
13     private static final String QUEUE_NAME = "rabbitmq_queue";
14 
15     public static void main(String[] args) throws IOException, TimeoutException {
16         // 获取一个连接
17         Connection connection = ConnectionUtils.getConnection();
18         // 从连接中获取一个通道
19         Channel channel = connection.createChannel();
20         // 创建队列
21         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22         String[] messages = getMessage(args);
23         for(String message : messages) {
24             // 往队列里发送消息
25             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
26             System.out.println("Send ‘" + message + "‘");
27         }
28         // 关闭通道
29         channel.close();
30         // 关闭连接
31         connection.close();
32     }
33 
34     private static String[] getMessage(String[] strings) {
35         if (strings.length < 1) {
36             strings = new String[] {
37                     "1 message.",
38                     "2 message..",
39                     "3 message...",
40                     "4 message.",
41                     "5 message..",
42                     "6 message...",
43                     "7 message.",
44                     "8 message..",
45                     "9 message...",
46                     "10 message.",
47             };
48         }
49         
50         return strings;
51         
52     }
53 
54 }
View Code

  我们的代码(receive.java)同样需要做一些改动:它需要为消息体中每一个点号(.)模拟1秒钟的操作。它会从队列中获取消息并执行,我们把它命名为worker1.java和worker2.java,这两个的代码时一摸一样的,代码如下:

技术分享图片
 1 package rabbitmq.main;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.AMQP.BasicProperties;
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.DefaultConsumer;
11 import com.rabbitmq.client.Envelope;
12 
13 import rabbitmq.utils.ConnectionUtils;
14 
15 public class Worker1 {
16     private static final String QUEUE_NAME = "rabbitmq_queue";
17 
18     public static void main(String[] args) throws IOException, TimeoutException {
19         // 获取连接
20         Connection connection = ConnectionUtils.getConnection();
21         // 创建 管道
22         Channel channel = connection.createChannel();
23         // 创建声明队列(可有可无)
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         Consumer consumer = new DefaultConsumer(channel) {
26             @Override
27             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
28                     throws IOException {
29                 String message = new String(body, "UTF-8");
30                 System.out.println("Received ‘" + message + "‘");
31                 try {
32                     doWork(message);
33                 }catch(Exception exception){
34                     System.out.println(" [x] error");
35                 }finally {
36 //                     System.out.println("Done");
37                 }
38             }
39             
40         };
41         boolean autoAck = true; // acknowledgment is covered below
42         // 监听队列
43         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
44     }
45     
46     private static void doWork(String task) throws InterruptedException {
47         for (char ch : task.toCharArray()) {
48             if (ch == ‘.‘)
49                 Thread.sleep(1000);
50         }
51     }
52 
53 }
View Code

3、循环调度(轮询调度)

  使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

首先,我们先同时运行worker1.java和worker2.java程序,它们都会从队列中获取消息,到底是不是这样呢?我们看看。需要打开三个终端,两个用来运行worker1.java和worker2.java程序,这两个终端就是我们的两个消费者(consumers)—— C1 和 C2。

  最后执行NewTask.java程序,其中效果如下:

  技术分享图片

  worker1的效果如下:

  技术分享图片

  worker2的效果如下:

技术分享图片

  默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

4、消息确认 

  当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

  我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

  为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

  如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

  消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

  消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

  在worker2代码中都做如下修改,当收到第7个或者第八个消息的工作者就挂掉,然后进行测试。

 

技术分享图片
 1 package rabbitmq.main;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.AMQP.BasicProperties;
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.DefaultConsumer;
11 import com.rabbitmq.client.Envelope;
12 
13 import rabbitmq.utils.ConnectionUtils;
14 
15 public class Worker2 {
16     private static final String QUEUE_NAME = "rabbitmq_queue";
17 
18     public static void main(String[] args) throws IOException, TimeoutException {
19         // 获取连接
20         Connection connection = ConnectionUtils.getConnection();
21         // 创建 管道
22         Channel channel = connection.createChannel();
23         channel.basicQos(1); // accept only one unack-ed message at a time (see below)
24         // 创建声明队列(可有可无)
25         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
26         Consumer consumer = new DefaultConsumer(channel) {
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
29                     throws IOException {
30                 String message = new String(body, "UTF-8");
31                 System.out.println("Received ‘" + message + "‘");
32                 if(message.contains("7") || message.contains("8")) {
33                      System.exit(0);
34                 }
35                 try {
36                     doWork(message);
37                 }catch(Exception exception){
38                     System.out.println(" [x] error");
39                 }finally {
40 //                     System.out.println(" [x] Done");
41                      channel.basicAck(envelope.getDeliveryTag(), false);
42                 }
43             }
44             
45         };
46         boolean autoAck = false; // acknowledgment is covered below
47         // 监听队列
48         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
49     }
50     
51     private static void doWork(String task) throws InterruptedException {
52         for (char ch : task.toCharArray()) {
53             if (ch == ‘.‘)
54                 Thread.sleep(1000);
55         }
56     }
57 
58 }
View Code

  

work1的代码如下:

技术分享图片
 1 package rabbitmq.main;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.AMQP.BasicProperties;
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.DefaultConsumer;
11 import com.rabbitmq.client.Envelope;
12 
13 import rabbitmq.utils.ConnectionUtils;
14 
15 public class Worker1 {
16     private static final String QUEUE_NAME = "rabbitmq_queue";
17 
18     public static void main(String[] args) throws IOException, TimeoutException {
19         // 获取连接
20         Connection connection = ConnectionUtils.getConnection();
21         // 创建 管道
22         Channel channel = connection.createChannel();
23         channel.basicQos(1); // accept only one unack-ed message at a time (see below)
24         // 创建声明队列(可有可无)
25         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
26         
27         Consumer consumer = new DefaultConsumer(channel) {
28             @Override
29             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
30                     throws IOException {
31                 String message = new String(body, "UTF-8");
32                 System.out.println("Received ‘" + message + "‘");
33                 try {
34                     doWork(message);
35                 }catch(Exception exception){
36                     System.out.println(" [x] error");
37                 }finally {
38 //                     System.out.println("Done");
39                      channel.basicAck(envelope.getDeliveryTag(), false);
40                 }
41             }
42             
43         };
44         boolean autoAck = false; // acknowledgment is covered below
45         // 监听队列
46         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
47     }
48     
49     private static void doWork(String task) throws InterruptedException {
50         for (char ch : task.toCharArray()) {
51             if (ch == ‘.‘)
52                 Thread.sleep(1000);
53         }
54     }
55 
56 }
View Code

  先执行work1和work2,然后执行,work2执行效果如下:

  技术分享图片

  当接收到第7个消息的时候就挂掉了,work1执行效果如下:

  技术分享图片

  work1还是收到了第7个消息。说明rabbitmq没有收到工作者发送到确认消息,是不会从队列中删除掉消息的,它会把消息发给另一个工作者处理。消息应答是默认打开的,即autoAck默认值是false。

 一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。channel.basicAck(envelope.getDeliveryTag(), false), 消息确认这行代码注意加上哦。

  

  

  

 

  

  

 

以上是关于rabbitmq系列二 之工作队列的主要内容,如果未能解决你的问题,请参考以下文章

消息队列MQ——Spring Boot整合RabbitMQ

RabbitMQ消息队列系列教程Windows下安装和部署RabbitMQ

RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

rabbitmq系列三 之发布/订阅

RabbitMQ系列教程之六:远程过程调用(RPC)

自动化运维Python系列之消息队列RabbitMQ