RabbitMQ:消息分发模型
Posted 栗筝i
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ:消息分发模型相关的知识,希望对你有一定的参考价值。
Work queues,也被称为 Task queues,任务模型,当消息处理比较耗时时的时候,可能产生消息的速度会远远大于消息的消费速度,消息会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。
RabbitMQ 单生产单消费模型主要有以下三个角色构成:
- 生产者(producer/ publisher):一个发送消息的用户应用程序。
- 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
- 消费者2(consumer):领取任务并完成…
- 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
~
本篇内容包括:RabbitMQ 消息分发模型、RabbitMQ 消息分发模型实现、RabbitMQ 手动消息确认
文章目录
一、RabbitMQ 消息分发模型
1、消息分发模型(Work Queue 模型)
Work queues,也被称为 Task queues,任务模型,当消息处理比较耗时时的时候,可能产生消息的速度会远远大于消息的消费速度,消息会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。
2、消息分发模型组成
RabbitMQ 单生产单消费模型主要有以下四个角色构成:
- 生产者(producer/ publisher):一个发送消息的用户应用程序。
- 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
- 消费者2(consumer):领取任务并完成…
- 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
二、RabbitMQ 消息分发模型实现
1、添加 Maven 依赖
# 在 pom.xml 文件中添加以下依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
...
</dependencies>
2、封装工具类 ConnectionUtil
# 连接工具类封装-ConnectionUtil
package com.lizhengi.work.demo1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author liziheng
* @version 1.0.0
* @description 连接工具类封装
* @date 2022-12-23 11:29 上午
**/
public class ConnectionUtil
/**
* 创建 MQ 连接工厂对象
*/
private static final ConnectionFactory CONNECTION_FACTORY;
// 类加载时,重量级资源加载
static
CONNECTION_FACTORY = new ConnectionFactory();
// 设置连接MQ主机
CONNECTION_FACTORY.setHost("127.0.0.1");
// 设置连接MQ端口号
CONNECTION_FACTORY.setPort(5672);
// 设置连接MQ虚拟主机
CONNECTION_FACTORY.setVirtualHost("/test");
// 设置连接MQ虚拟主机的用户名密码
CONNECTION_FACTORY.setUsername("admin");
CONNECTION_FACTORY.setPassword("123456");
/**
* 建立与RabbitMQ连接 工具方法
*
* @return Connection
*/
public static Connection getConnection()
try
// 返回连接对象
return CONNECTION_FACTORY.newConnection();
catch (Exception e)
e.printStackTrace();
return null;
/**
* 关闭通道和连接 工具方法
*
* @param channel Channel
* @param connection Connection
*/
public static void closeConnectionAndChanel(Channel channel, Connection connection)
try
if (channel != null)
channel.close();
if (connection != null)
connection.close();
catch (Exception e)
e.printStackTrace();
3、生产者实现
package com.lizhengi.work.demo1;
import com.lizhengi.hello.demo2.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.testng.annotations.Test;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 消息分发模型 生产者
* @date 2022-12-23 11:32 上午
**/
public class Producer
/**
* 消息生产
*
* @throws IOException IOException
*/
@Test
public void testSendMessage() throws IOException
// 获取连接对象
Connection connection = ConnectionUtil.getConnection();
// 获取通道
assert connection != null;
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("WorkQueue", true, false, false, null);
// 生产消息
for (int i = 0; i < 100; i++)
channel.basicPublish("", "WorkQueue", null, ("Hello RabbitMQ " + i).getBytes());
// 资源关闭
ConnectionUtil.closeConnectionAndChanel(channel, connection);
4、消费者-1 实现
package com.lizhengi.work.demo1;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 消息分发模型 消费者-1
* @date 2022-12-23 11:32 上午
**/
public class Customer1
public static void main(String[] msg) throws IOException
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueue", true, false, false, null);
channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
System.out.println("消费者1:" + new String(body));
try
Thread.sleep(1000);
catch (Exception e)
e.printStackTrace();
);
5、消费者-2 实现
package com.lizhengi.work.demo1;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 消息分发模型 消费者-2
* @date 2022-12-23 11:32 上午
**/
public class Customer2
public static void main(String[] msg) throws IOException
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueue", true, false, false, null);
channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
System.out.println("消费者2:" + new String(body));
);
6、消息队列的循环机制
运行上面的消费者与生产者,观察消费者打印:
消费者1:Hello RabbitMQ 1
消费者1:Hello RabbitMQ 3
消费者1:Hello RabbitMQ 5
消费者1:Hello RabbitMQ 7
消费者1:Hello RabbitMQ 9
消费者2:Hello RabbitMQ 0
消费者2:Hello RabbitMQ 2
消费者2:Hello RabbitMQ 4
消费者2:Hello RabbitMQ 6
消费者2:Hello RabbitMQ 8
可以观察到,两个消费者依次打印了队列中的消息,哪怕是在其中一个加上 Thread.sleep(1000);
依然是这种循环机制!
三、RabbitMQ 手动消息确认
1、消费者-1 实现
package com.lizhengi.work.demo2;
import com.lizhengi.work.demo1.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 消息分发模型 消费者-1
* @date 2022-12-23 11:32 上午
**/
public class Customer1
public static void main(String[] msg) throws IOException
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 设置每次只能消费一个消息
channel.basicQos(1);
channel.queueDeclare("WorkQueue", true, false, false, null);
// 参数2:自动消息确认关闭
channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
try
Thread.sleep(1000);
catch (Exception e)
e.printStackTrace();
System.out.println("消费者1:" + new String(body));
/* 手动消息确认
* 参数1:确认队列中的那个具体消息
* 参数2:是否开启多个消息同时确认
*/
channel.basicAck(envelope.getDeliveryTag(), false);
);
2、消费者-2 实现
package com.lizhengi.work.demo2;
import com.lizhengi.work.demo1.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 消息分发模型 消费者-2
* @date 2022-12-23 11:32 上午
**/
public class Customer2
public static void main(String[] msg) throws IOException
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
//设置每次只能消费一个消息
channel.basicQos(1);
channel.queueDeclare("WorkQueue", true, false, false, null);
channel.basicConsume("WorkQueue", false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("消费者2:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
);
3、实现能者多劳
运行上面的消费者与生产者,观察消费者打印:
消费者1:Hello RabbitMQ 0
消费者1:Hello RabbitMQ 2
消费者1:Hello RabbitMQ 3
消费者1:Hello RabbitMQ 4
消费者1:Hello RabbitMQ 5
消费者1:Hello RabbitMQ 6
消费者1:Hello RabbitMQ 7…
消费者2:Hello RabbitMQ 1
消费者2:Hello RabbitMQ 64
消费者2:Hello RabbitMQ 76
消费者2:Hello RabbitMQ 89
可以观察到,消费者1开启了手动消费机制后进行了更大比重消息数量的消费!
以上是关于RabbitMQ:消息分发模型的主要内容,如果未能解决你的问题,请参考以下文章