RabbitMQ-AMQP模型详解二
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-AMQP模型详解二相关的知识,希望对你有一定的参考价值。
RabbitMQ-AMQP模型详解_踩踩踩从踩的博客-CSDN博客
前言
上篇文章介绍了AMQP得流程,以及介绍Vhost Host、连接 、通道 、RoutingKey、exchange、绑定、message等组件;这篇文章会继续介绍AMQP中重要的概念,生产路由不可达,以及可靠的发布 事务机制,发布确认机制,消费者独占等机制
publisher
路由不可达
当消息发送给交换器或队列,在发送中,出现没有队列。
- 交换没有绑定队列
- 交换没法根据消息的路由key把消息路由到队列。
可以处理的情况 但是别抛异常 只是为找到交换器之类的
- 退回
- 死信队列(备用交换)
退回
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body)
channel.addReturnListener(returnMessage -> {
try {
System.out.println("收到退回消息:" + new String(returnMessage.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
在spring中使用
- 设置消息不可以路由退回,设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
@Bean
public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true); // 设置消息不可以路由退回
// 设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
template.setReturnCallback(myReturnCallback());
return template;
}
- replyCode broker的回应码 replyText 回应描述
private ReturnCallback myReturnCallback() {
return new ReturnCallback() {
@Override
// replyCode broker的回应码 replyText 回应描述
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
String routingKey) {
// 在这里写退回处理逻辑
System.out.println("收到回退消息 replyCode=" + replyCode + " replyText=" + replyText + " exchange=" + exchange
+ " routingKey=" + routingKey);
System.out.println(" 消息:" + message);
}
};
}
在spring中,要重写 ReturnCallback,如果在spring中 配置文件中添加属性配置,这个是没有用的。在返回的数据可以知道
备用交换
- policy 设置好策略,
rabbitmqctl set_policy mike "^my-direct$" '{"alternate-exchange":"my-ae"}'
#对那些交换器进行匹配 指定备用交换器
- 代码中声明交换时通过参数指定备用交换
//声明参数
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "my-ae");
//备用交换参数指定
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout");
channel.queueDeclare("routed");
channel.queueBind("routed", "my-direct", "key1");
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");
加上备用参数进行指定上 myae上通道上去。
事务机制
怎么确认可靠发布,这就是事务机制要做的事情,保证网络传输的可靠发布。保证一个收,要么都收,无论是数据库,还是mq都是一样的,都是通过保证的。
spring 事务管理需要的组件
@Configuration public class TxConfiguration {
@Bean
// 配置事务管理器
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
@Transactional
public void send(int i) {
// 一定要设置ChannelTransacted(true) 表示开启通道事务
this.template.setChannelTransacted(true);
String message = "Hello World!-" + i;
this.template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
if (i % 2 == 0)
throw new RuntimeException();
}
发布确认机制
- 异步流式确认 事件驱动 优点 :开销低,吞吐量大
- 批量发布确认 批次等待,确认不ok 一批重发
- 单条确认 发一条就等待确认
- ack 接收成功
- nack 接收失败
- 发布者收不到Broker的确认(超时)
这都是确认会出现的情况。
异步流式确认
- 开启发布确认模式 就不能再做事务管理了
- 待确认消息的Map
- 指定流式确认事件回调处理
- 从Map中移除对应的消息
- 重发,或做其他处理
// 1 开启发布确认模式 就不能再做事务管理了
channel.confirmSelect();
// 2 待确认消息的Map
Map<Long, String> messagesMap = new ConcurrentHashMap<>();
// 3 指定流式确认事件回调处理
channel.addConfirmListener((deliveryTag, multiple) -> { // multiple表示是否是多条的确认
System.out.println("收到OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + ",从Map中移除消息");
// 从Map中移除对应的消息
messagesMap.remove(deliveryTag);
}, (deliveryTag, multiple) -> {
System.out.println("收到 NON OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + " 从Map中移除消息,重发或做其他处理");
// 从Map中移除对应的消息
String message = messagesMap.remove(deliveryTag);
// 重发,或做其他处理
System.out.println("失败消息:" + message);
});
for (int i = 1; i < 100; i++) {
// 消息内容
String message = "消息" + i;
// 4 将消息放入到map中
messagesMap.put(channel.getNextPublishSeqNo(), message);
// 5、发送消息
channel.basicPublish("mandatory-ex", "", true, null, message.getBytes());
System.out.println("发布消息:" + message);
Thread.sleep(2000L);
}
在spring中添加 publisher-confirms 开启消息确认
这里做流式确认 设置回调 发送
// 配置RabbitTemplate Bean
@Bean
public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 设置发布确认回调,一个RabbitTemplate只可设置一个回调。
template.setConfirmCallback(confirmCallback());
return template;
}
Consumer
消费者的两种消息模式、消费者 注册 取消 、独占消费者 消费者 优先级 消息确认 pull 拉模式消费。这几种模式。
两种消费模式
- push 推模式
- pull 拉模式
在这两种模式下面的问题出现
push 模式
// 对感兴趣的队列注册消费者,返回Server生成的consumerTag(消费者标识) String consumerTag = channel.basicConsume(queueName, true, callback, consumerTag -> {});
channel.basicCancel(consumerTag);
独占消费者
采用不断的重试去抢独占,也是防止被挂了。
消费者优先级
消费者多个时,可以根据需要设置优先级,设备好的机器,更高优先级处理
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10); // 整数,数值越大优先级越高。 默认 0
channel.basicConsume("my-queue", false, args, consumer);
注册时将优先级带上就行。
消息确认
- Automatic 自动 送货无需确认 直接就会把消息删除 吞吐量是相当高的 对于业务要求不高,并且吞吐量高的,可以采用这么模式
- Manual 手动 需要客户签收
- basic.ack 用于正面确认,消费者确认消息被妥善处理,broker可以移除该消息了。
- basic.nack 用于负面确认,扩展了basic.reject,以支持批量确认。是RabbitMQ对AMQP-0-9-1的扩展。
- basic.reject 用于负面确认
// 批量Nack,并重发
GetResponse gr1 = channel.basicGet("some.queue", false);
GetResponse gr2 = channel.basicGet("some.queue", false); channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true); //第二个参数 true表示批量
在spring中 只需要设置 havingvalue就可以
direct 采用异步的方式确认。
Pull 拉模式消费
从指定队列拉取一条消息,进行消息处理,然后根据处理结果决定该如何确认消息。 获得消息传递标识,手动单条确认消息ok,手动单条确认消息reject,并重配送,手动单条确认消息reject,移除不重配送;手动单条确认消息Nack,并重配送
public static void main(String[] args) throws Exception {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("localhost");
String queueName = "queue1";
try (
// 3、从连接工厂获取连接
Connection connection = factory.newConnection("消费者1");
// 4、从链接中创建通道
Channel channel = connection.createChannel();) {
// 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
channel.queueDeclare(queueName, true, false, false, null);
System.out.println("开始接收消息");
boolean autoAck = false;
while (true) {
// 从指定队列拉取一条消息
GetResponse gr = channel.basicGet(queueName, autoAck);
if (gr == null) {// 未取到消息
System.out.println("队列上没有消息");
} else { // 取到消息
// 进行消息处理,然后根据处理结果决定该如何确认消息。
System.out.println("取得消息:" + new String(gr.getBody(), "UTF-8"));
System.out.println("消息的属性有:" + gr.getProps());
System.out.println("队列上的消息数量有:" + gr.getMessageCount());
// 获得消息传递标识
long deliveryTag = gr.getEnvelope().getDeliveryTag();
boolean multiple = false; // 是否批量
boolean requeue = true; // 是否重配送
// 手动单条确认消息ok
// channel.basicAck(deliveryTag, false);
// 手动单条确认消息reject,并重配送
// channel.basicReject(deliveryTag, requeue);
// 手动单条确认消息reject,移除不重配送
// channel.basicReject(deliveryTag, false);
// 手动单条确认消息Nack,并重配送
// channel.basicNack(deliveryTag, multiple, requeue);
}
TimeUnit.SECONDS.sleep(2L);
}
}
}
以上是关于RabbitMQ-AMQP模型详解二的主要内容,如果未能解决你的问题,请参考以下文章