rabbitMQ的进阶使用
Posted 零
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMQ的进阶使用相关的知识,希望对你有一定的参考价值。
1.1 消息队列持久化
创建一个队列的时候,可以是非持久化的,也可以是持久化的
- 非持久化:rabbitmq如果重启,该队列就会被删除
- 持久化:重启不影响
- 消息持久化必须要消息队列持久化
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
1.2 消息的持久化
消息持久化,可以一定程度上去预防消息丢失,需要设置
MessageProperties.PERSISTENT_TEXT_PLAIN
注意:配置了消息持久化,并不能够完全保证消息不丢失,只能保证消息到了队列中不消失
消息丢失的方式:
- 发送方发送到消息队列的时候,丢了
- 交换机到队列中,丢了
- 队列到消费者,丢了,目前所学的知识点,队列中还有消息,再次发送
后续的解决方案,参考确认Confirm,Return
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
1.3 ACK
默认情况下,消费者接收到消息的时候(但是通常接到消息就处理了),就会进行自动应答(ACK)
如果一个消费者处理消息时间长,或者异常了,所以需要手动Ack
- 自动Ack
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
;
// true 表示自动ack:autoAck
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> );
- 手动Ack
// 1.channel.basicConsume 将true改为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> );
// 2.在finally 中手动Ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
1.4 消费不均匀
工作模式中,默认轮询,会出现一些问题,比如:两个消费者,亮跟大哥,实现功能(10个),
亮:1,3,5,7,9
大哥:2,4,6,8,10
但是大哥实现的快,出现一个问题,大哥3个小时完成了,摸鱼5小时,亮做了8小时
所以我们可以设置一个预抓取值,官网给的是一次抓取一个任务(每次抓取一个消息,当上一个消息没有ack的时候,不抓取新的消息)
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意:这里的预抓取值给的是1,这个值太小(消费者能力比较强,就会等待),但是具体还是看服务器性能跟消息复杂度,通常情况下100-300
2 Publish/Subscribe(Fanout)
多出来的X,就是交换机,交换机接收发送方的消息,将消息发送到队列中,交换机还有不同的类型
Exchange的类型
- Fanout:交换机会将消息发送到所有和它进行绑定的队列上,广播,群发
- Direct:直接交换机通过消息上的路由键直接对消息进行分发,相当于精确匹配,一对一
- Topic:这个交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
- Headers:消息头交换机使用消息头的属性进行消息路由,相当于模糊匹配(like header%),一对多
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bmlRWZxU-1649855291807)(消息队列.assets/image-20220413104215839.png)]
2.1 操作
- 发送方
public class EmitLog
// 交换机名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception
try (Channel channel = RabbitMqUtils.getChannel())
// 创建交换机,并设置交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner scanner = new Scanner(System.in);
while (true)
System.out.print("请输入要发送的消息:");
String msg = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("UTF-8"));
System.out.println("消息已发送:" + msg);
- 消费方 01- 控制台打印
// 第一个消费者
public class ReceiveLogs01
// 交换机名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception
Channel channel = RabbitMqUtils.getChannel();
// 创建一个交换机,并设置类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建一个非持久化队列
String queueName = channel.queueDeclare().getQueue();
// 将交换机和队列绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" 准备接收消息。。。");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), "UTF-8");
// 第一个消费者在控制台进行答应
System.out.println("控制台打印的消息:" + message);
;
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> );
- 消费者02-保存到本地硬盘
public class ReceiveLogs02
// 交换机名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception
Channel channel = RabbitMqUtils.getChannel();
// 创建一个交换机,并设置类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建一个非持久化队列
String queueName = channel.queueDeclare().getQueue();
// 将交换机和队列绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" 准备接收消息。。。");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), "UTF-8");
// 第二个消费者将消息保存到本地
File file = new File("C:\\\\rabbitmq_log.txt");
FileUtils.writeStringToFile(file, message, "UTF-8");
System.out.println("日志写入成功");
;
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> );
3 Routing(direct)
在上一个发布订阅中,实现了一对多,但是每一个消息,都会执行我们指定的两个操作,粒度更加细致
对于error级别的信息,需要在控制台打印,并且保存到本地,对于其他信息,仅仅是控制台打印
3.1 操作
- 发送方
public class EmitLogDirect
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception
final Channel channel = RabbitMqUtils.getChannel();
// 1.创建交换机,更换了交换机的类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 准备路由的key
Map<String, String> map = new HashMap<>();
map.put("info", "普通的info消息");
map.put("warning", "warning警告消息");
map.put("error", "error错误消息");
map.put("debug", "debug调式消息");
// 循环map,key routkey, value,是发送的消息
for (Map.Entry<String, String> entry : map.entrySet())
String key = entry.getKey();
String value = entry.getValue();
// 发送
channel.basicPublish(EXCHANGE_NAME, key, null, value.getBytes("UTF-8"));
System.out.println("消息发送成功");
4 Topics (Topic)
发送到主题交换的消息不能有任意的 routing_key - 它必须是单词列表,由点分隔。
这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中可以有任意多的单词,最多为 255 个字节
-
*(星号)可以只替换一个单词。
-
# (hash) 可以代替零个或多个单词
-
绑定的时候出现情况
- 当一个队列绑定的是#,那么这个队列是不是接收了所有的消息,这个时候跟fanout有点像
- 当一个队列绑定的既没有#,也没有*,那就是一个direct
-
quick.orange.rabbit - Q1,Q2
-
lazy.orange.ele - Q1,Q2
-
quick.orange.fix - Q1
-
lazy.brown.fox - Q2
-
lazy.pink.rabbit - Q2 满足了两次,消息一条
-
quick.brown.fox - 不满足
-
quick.orange.male.rabbit - 不满足
-
lazy.orange.male.rabbit - Q2
4.1 操作
- 发送方
public class EmitLogTopic
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception
final Channel channel = RabbitMqUtils.getChannel();
// 1.创建交换机,更换了交换机的类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 准备路由的key
Map<String, String> map = new HashMap<>();
map.put("quick.orange.rabbit", "Q1,Q2接收");
map.put("lazy.orange.ele", "Q1,Q2接收");
map.put("quick.orange.fix", "Q1接收");
map.put("lazy.pink.rabbit", "Q2接收");
map.put("quick.brown.fox", "不满足");
map.put("quick.orange.male.rabbit", "不满足");
map.put("lazy.orange.male.rabbit", "Q2接收");
// 循环map,key routkey, value,是发送的消息
for (Map.Entry<String, String> entry : map.entrySet())
String key = entry.getKey();
String value = entry.getValue();
// 发送
channel.basicPublish(EXCHANGE_NAME, key, null, value.getBytes("UTF-8"));
System.out.println("消息发送成功");
- 消费方
public class ReceiveLogsTopict01
// 交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
Channel channel = RabbitMqUtils.getChannel();
// 创建一个交换机,并设置类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 创建一个非持久化队列
String queueName = channel.queueDeclare().getQueue();
// 将交换机和队列绑定
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println(" 准备接收消息。。。");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), "UTF-8");
message = "绑定的key:" + delivery.getEnvelope().getRoutingKey() + "-绑定的值:" + message + "-规则(*.orange.*)";
// 第一个消费者在控制台进行答应
System.out.println("控制台打印的消息:" + message);
;
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> );
public class ReceiveLogsTopict02
// 交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
Channel channel = RabbitMqUtils.getChannel();
// 创建一个交换机,并设置类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 创建一个非持久化队列
String queueName = channel.queueDeclare().getQueue();
// 将交换机和队列绑定
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(" 准备接收消息。。。");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), "UTF-8");
message = "绑定的key:" + delivery.getEnvelope().getRoutingKey() + "-绑定的值:" + message + "-规则:(*.*.rabbit,lazy.#)";
// 第一个消费者在控制台进行答应
System.out.println("控制台打印的消息:" + message);
;
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> );
5 Publisher Confirms
不仅仅是防止发送方到mq的消息丢失,还可以通过异步的方式来提高效率
1.为了消息不丢失,持久化,持久化是同步的,可以通过confirm的异步确认提高效率
// 1.启用发布取人
channel.confirmSelect();
5.1 三种方式
1.单个确认(同步)
2.批量确认(同步)
3.异步确认
以上是关于rabbitMQ的进阶使用的主要内容,如果未能解决你的问题,请参考以下文章