RabbitMQ 学习---- 工作队列模式
Posted RAIN 7
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 学习---- 工作队列模式相关的知识,希望对你有一定的参考价值。
文章目录
RabbitMQ 学习(四)---- 工作队列模式
这是第二种模型 (Work Queue),任务模型,当消息处理比较耗时的时候,生产者发送消息的速度远远大于消费的速度,长此以往,消息就会堆积的越来越多,无法及时处理,可以使用work模型,让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
(1)公平竞争机制
一个生产者发送消息到默认交换机,通过路由同名规则将 队列中的信息 循环分发到 监听队列的消费者中,一对多,不过是公平分发,按照顺序将每条消息发送给每一个消费者。每个消费者平均分配队列中的消息。公平竞争。
这里的生产者代码与 之前简单模型,一模一样,只不过循环发送了多条消息等待分配
package workAver;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import utils.RabbitMQUtils;
import java.io.IOException;
public class AvgProvider
public static void main(String[] args)
Connection connection =null;
Channel channel =null;
try
// 1、获取连接对象
connection = RabbitMQUtils.getConnect();
//2、通过连接获取信道
assert connection != null;
channel = connection.createChannel();
// 声明发送的消息
String message = "work平均分配生产的消息!";
//3、声明队列信息
channel.queueDeclare("work", false, false, false, null);
for (int i = 1; i <= 10; i++)
//4、使用信道发送消息, routineKey与队列同名方便匹配
channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, (message+": "+i).getBytes());
catch (IOException e)
e.printStackTrace();
finally
RabbitMQUtils.close(channel,connection);
创建多个消费者,与之前简单模型一模一样,只不过创建了多个连接到 rabbitMq的服务器上对 队列进行监听
消费者1
package workAver;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class AvgCustomer1
public static void main(String[] args)
try
Connection connection = RabbitMQUtils.getConnect();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("work", false, false, false, null);
// 该消费者1 接收队列中的消息
channel.basicConsume("work", true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println(new String(body));
);
catch (IOException e)
e.printStackTrace();
消费者2
package workAver;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class AvgCustomer2
public static void main(String[] args)
try
Connection connection = RabbitMQUtils.getConnect();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("work", false, false, false, null);
// 该消费者1 接收队列中的消息
channel.basicConsume("work", true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println(new String(body));
);
catch (IOException e)
e.printStackTrace();
消费者1 收到的消息情况,全部是奇数
消费者2 收到的消息的情况,全部都是偶数
可以证明,监听队列的消费者中间一次循环接收队列中的消息,公平竞争
(2)能者多劳机制
我们需要了解消费者的自动确认机制
默认情况下,RabbitMQ会按照顺序将每个消息发送到下一个使用者,平均每个消费者会受到相同数量的消息,这种分发消息的机制称为循环。
而为什么会自动平均分配呢?
因为我们在消费者消费的时候有一个参数设置为 autoAck:true
,我们设置消费者接收消息自动确认,而一般都是队列中分配好了那个消费者要传递什么信息,直接一次全部传递过去,不是消费一个确认一下。消费者接收到所有消息之后自动确认,队列中会标记删除所有的信息,他不关心你接收完信息之后的后续业务操作。就是只关心你是否收到了数据。
消息自动确认机制,完成一项任务可能需要几秒钟甚至几分钟,如果一个消费者开始了一项长期的任务,却只完成了一部分就挂了,那么RabbitMQ一旦将消息传递给消费者,就会立刻标记删除,那么因为消费者挂了,接收数据的时候已经确认应答了,队列中的数据也删除了,所以剩余接收到的信息也没了。
autoAck 取消 ,手动确认
如果生产者发送10条消息,消费者1拿到5条,消费者2拿到5条,不进行自动应答,服务器队列的数据即使消费了,我们没有应答就不会被标记删除,保证服务器队列中的数据一直还在。如果消费者处理完了这条数据,那么手动确认,队列中知道已经确认了进行删除
接收消息的时候,参数设置为false
执行回调函数的时候,手动确认
在这里,我们在 handlerDeliver 方法中,
-
第一个参数 deliveryTag,代表本次信道传递的标签标识,Envelope 类的实例包含了 本次传递的 deliverTag数据可直接获取
-
第二个参数 multiple:批量
比如批量确认.当multiple的值设置为true时,RabbitMQ将确认指定传输标签以及之前所有未被确认的消息。与单个确认相同,批量确认的作用域为每个通道。例如:通道Ch上有四个未被确认的消息,标签分别为5,6,7,8;当一个delivery_tag值为8并且multiple值为true的确认消息到达通道时,所有5到8的标签都会被确认。如果multiple值设置为false,标签为5,6,7的消息将不会被确认。
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
补充一点还有其他手动确认的API
chanel 传递1条数据
在之前的公平竞争机制下,说是按照顺序给每一个消费者数据,其实在发送给消费者之前在内部已经计算好了,给消费者一第1、3、5、7、9数据,一次性发送,给消费者二第2、4、6、8、10条数据,然后一次性发送,如果我们手动确认的话,那么相当于一次确认一大批,队列经过确认后进行删除,如果在后续处理业务中挂掉了,照样消息已经删除了。
我们设置一个信道每次只能消费一个消息,如果其中一个消费者服务器挂掉了,连接就断掉了,剩余的未被手动确认的数据还在队列中保存。也能及时得把剩余的消息继续交给消费者2进行处理,不耽误业务的持续进行。
这就是能者多劳的机制。就是说处理快的消费者处理完业务会很快的手动确认,然后再次进行接收新的消息,处理慢的消费者经过一段时间处理之后再进行确认,就会能者多劳,业务处理快的接受的消息多,处理满的接受的少
在消费消息之前设置信道中接收消息只能是1个
chanel.basicQos(1); // 设置信道中一次只能消费一个信息
(3) 能者多劳的代码案例
1、生产者
生产者发送给队列 10条消息
package workAver;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import utils.RabbitMQUtils;
import java.io.IOException;
public class AvgProvider
public static void main(String[] args)
Connection connection =null;
Channel channel =null;
try
// 1、获取连接对象
connection = RabbitMQUtils.getConnect();
//2、通过连接获取信道
assert connection != null;
channel = connection.createChannel();
// 声明发送的消息
String message = "work平均分配生产的消息!";
//3、声明队列信息
channel.queueDeclare("work", false, false, false, null);
for (int i = 1; i <= 10; i++)
//4、使用信道发送消息, routineKey与队列同名方便匹配
channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, (message+": "+i).getBytes());
catch (IOException e)
e.printStackTrace();
finally
RabbitMQUtils.close(channel,connection);
2、消费者1
接收到消息之后,会休眠2秒,在进行业务操作,作为处理较慢的消费者,设置信道每次传递一个,处理完业务手动确认ack。
package workAver;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class AvgCustomer1
public static void main(String[] args)
try
Connection connection = RabbitMQUtils.getConnect();
final Channel channel = connection.createChannel();
//声明信道中一次只能接受一条信息
channel.basicQos(1);
// 声明队列
channel.queueDeclare("work", false, false, false, null);
// 该消费者1 接收队列中的消息
channel.basicConsume("work", false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
// 休眠2s
try
Thread.sleep(2000);
catch (InterruptedException e)
e.printStackTrace();
// 处理业务
System.out.println(new String(body));
//手动确认,删除队列中的信息
channel.basicAck(envelope.getDeliveryTag(), false);
);
catch (IOException e)
e.printStackTrace();
3、消费者2
接收到消息之后,休眠1秒,作为消费较快的消费者,设置信道传递1条数据,处理完业务之后手动确认。
package workAver;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class AvgCustomer2
public static void main(String[] args)
try
Connection connection = RabbitMQUtils.getConnect();
final Channel channel = connection.createChannel();
//声明信道中一次只能接受一条信息
channel.basicQos(1);
// 声明队列
channel.queueDeclare("work", false, false, false, null);
// 该消费者1 接收队列中的消息
channel.basicConsume("work", false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
// 休眠1秒
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
// 处理业务
System.out.println(new String(body));
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
);
catch (IOException e)
e.printStackTrace();
4、查看接收结果
消费者1,处理业务2s,处理了4条消息
消费者2,处理业务1s,处理了6条消息
实现了能者多劳机制
以上是关于RabbitMQ 学习---- 工作队列模式的主要内容,如果未能解决你的问题,请参考以下文章