RabbitMQ-AMQP模型详解二

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-AMQP模型详解二相关的知识,希望对你有一定的参考价值。

RabbitMQ-AMQP模型详解_踩踩踩从踩的博客-CSDN博客

前言

上篇文章介绍了AMQP得流程,以及介绍Vhost Host、连接  、通道 、RoutingKey、exchange、绑定、message等组件;这篇文章会继续介绍AMQP中重要的概念,生产路由不可达,以及可靠的发布 事务机制,发布确认机制,消费者独占等机制

publisher

路由不可达

当消息发送给交换器或队列,在发送中,出现没有队列。

  • 交换没有绑定队列
  • 交换没法根据消息的路由key把消息路由到队列。

可以处理的情况 但是别抛异常 只是为找到交换器之类的

  • 退回
  • 死信队列(备用交换)

退回

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body)
mandatory true 强制退回, false 不需退回,直接丢弃。
在发送数据时设置 
设置返回消息的回调处理
channel.addReturnListener(returnMessage -> {
				try {
					System.out.println("收到退回消息:" + new String(returnMessage.getBody(), "UTF-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			});

在spring中使用

  • 设置消息不可以路由退回,设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
@Bean
	public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		template.setMandatory(true); // 设置消息不可以路由退回
		// 设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
		template.setReturnCallback(myReturnCallback());
		return template;
	}
  • replyCode broker的回应码 replyText 回应描述
private ReturnCallback myReturnCallback() {
		return new ReturnCallback() {
			@Override
			// replyCode broker的回应码 replyText 回应描述
			public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
					String routingKey) {

				// 在这里写退回处理逻辑
				System.out.println("收到回退消息 replyCode=" + replyCode + " replyText=" + replyText + " exchange=" + exchange
						+ " routingKey=" + routingKey);

				System.out.println(" 消息:" + message);
			}
		};
	}

在spring中,要重写 ReturnCallback,如果在spring中 配置文件中添加属性配置,这个是没有用的。在返回的数据可以知道

备用交换

  • policy  设置好策略,
rabbitmqctl set_policy mike "^my-direct$" '{"alternate-exchange":"my-ae"}'
#对那些交换器进行匹配  指定备用交换器
  • 代码中声明交换时通过参数指定备用交换
//声明参数 
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("alternate-exchange", "my-ae"); 
//备用交换参数指定 
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout"); 
channel.queueDeclare("routed"); 
channel.queueBind("routed", "my-direct", "key1"); 
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");

加上备用参数进行指定上 myae上通道上去。

事务机制

怎么确认可靠发布,这就是事务机制要做的事情,保证网络传输的可靠发布。保证一个收,要么都收,无论是数据库,还是mq都是一样的,都是通过保证的。

当方法里面发布消息,并且需要做其他事情时,所以开启事务

spring 事务管理需要的组件 

事务管理器 TransactionManager  
@Configuration public class TxConfiguration { 
        @Bean 
        // 配置事务管理器 
   public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { 
        return new RabbitTransactionManager(connectionFactory); 
    }
 }
spring 该怎么玩事务就怎么玩 .  
RabbitTransactionManager 只能做 Rabbitmq 的消息事务管理 只能是单连接的连接工厂
如果方法中,即要做数据库又要做rabbitmq,它没办法实现的。
没有分布式事务管理器实现。
rabbitmq 中事务机制来保证消息的可靠发布,性能是比较差  这是相对的发布确认机制。
调用 开启
@Transactional
	public void send(int i) {
		// 一定要设置ChannelTransacted(true) 表示开启通道事务
		this.template.setChannelTransacted(true);
		String message = "Hello World!-" + i;
		this.template.convertAndSend(queue.getName(), message);
		System.out.println(" [x] Sent '" + message + "'");
		if (i % 2 == 0)
			throw new RuntimeException();
	}

发布确认机制

性能是事务机制的 250 倍。
发布者发布消息,一般是走异步。
channel
有三种确认模式
  • 异步流式确认 事件驱动  优点 :开销低,吞吐量大  
  • 批量发布确认 批次等待,确认不ok 一批重发  
  • 单条确认 发一条就等待确认 
broker 给出确认会有三种结果
  • ack 接收成功
  • nack 接收失败
  • 发布者收不到Broker的确认(超时)

这都是确认会出现的情况。

异步流式确认

  • 开启发布确认模式 就不能再做事务管理了
  • 待确认消息的Map
  • 指定流式确认事件回调处理
  • 从Map中移除对应的消息
  • 重发,或做其他处理
// 1 开启发布确认模式 就不能再做事务管理了
			channel.confirmSelect();
			// 2 待确认消息的Map
			Map<Long, String> messagesMap = new ConcurrentHashMap<>();

			// 3 指定流式确认事件回调处理
			channel.addConfirmListener((deliveryTag, multiple) -> { // multiple表示是否是多条的确认
				System.out.println("收到OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + ",从Map中移除消息");
				// 从Map中移除对应的消息
				messagesMap.remove(deliveryTag);
			}, (deliveryTag, multiple) -> {
				System.out.println("收到 NON OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + " 从Map中移除消息,重发或做其他处理");
				// 从Map中移除对应的消息
				String message = messagesMap.remove(deliveryTag);
				// 重发,或做其他处理
				System.out.println("失败消息:" + message);
			});

for (int i = 1; i < 100; i++) {
				// 消息内容
				String message = "消息" + i;
				// 4 将消息放入到map中
				messagesMap.put(channel.getNextPublishSeqNo(), message);
				// 5、发送消息
				channel.basicPublish("mandatory-ex", "", true, null, message.getBytes());
				System.out.println("发布消息:" + message);

				Thread.sleep(2000L);
			}

在spring中添加 publisher-confirms 开启消息确认 

这里做流式确认 设置回调 发送

// 配置RabbitTemplate Bean
	@Bean
	public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		// 设置发布确认回调,一个RabbitTemplate只可设置一个回调。
		template.setConfirmCallback(confirmCallback());

		return template;
	}

Consumer

消费者的两种消息模式、消费者 注册 取消 、独占消费者 消费者 优先级  消息确认  pull 拉模式消费。这几种模式。

两种消费模式

  • push 推模式
  • pull 拉模式

在这两种模式下面的问题出现

push 模式

broker client 消费者
client broker 注册对某个队列的消费者
// 对感兴趣的队列注册消费者,返回Server生成的consumerTag(消费者标识) String consumerTag = channel.basicConsume(queueName, true, callback, consumerTag -> {});
取消消费者注册
channel.basicCancel(consumerTag);

独占消费者

独占队列:被创建它的连接独占 这个连接上的 channel 可以共享。连接关闭,独占队列没有了。
独占消费者:消费者独占一个队列进行消息消费,适用场景: 消息一定要严格按序消费处理。
一旦独占消费者挂了的话,消息队列里面的数据就会一直存在着,因此需要备用的
在spring中 如何添加 只需要添加 exclusive设置为true就可以了

采用不断的重试去抢独占,也是防止被挂了。

消费者优先级

消费者多个时,可以根据需要设置优先级,设备好的机器,更高优先级处理

x-priority
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-priority", 10); // 整数,数值越大优先级越高。 默认 0 
channel.basicConsume("my-queue", false, args, consumer);

注册时将优先级带上就行。

prefetch 20
Broker 轮询发送, 同优先级时是轮询 消息优先发给优先级高的消费者,直到 prefetch 满了 或 block.
先发满优先级高的。

消息确认

消息传递的模式(确认)
  • Automatic 自动 送货无需确认  直接就会把消息删除  吞吐量是相当高的  对于业务要求不高,并且吞吐量高的,可以采用这么模式
  • Manual 手动 需要客户签收
手动确认的情况 和 prefetch 配合使用
prefetch spring 中默认值为: 250   不能将这个设置太大,根据业务来的。
手动确认的 3 个操作
  • basic.ack 用于正面确认,消费者确认消息被妥善处理,broker可以移除该消息了。
  • basic.nack 用于负面确认,扩展了basic.reject,以支持批量确认。是RabbitMQAMQP-0-9-1扩展。
  • basic.reject 用于负面确认
负面确认,可以指示 Broker 移除消息以及是否重发。
手动操作
// 批量Nack,并重发 
GetResponse gr1 = channel.basicGet("some.queue", false);
 GetResponse gr2 = channel.basicGet("some.queue", false); channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true); //第二个参数 true表示批量

在spring中 只需要设置 havingvalue就可以

 direct 采用异步的方式确认。

Pull 拉模式消费

从指定队列拉取一条消息,进行消息处理,然后根据处理结果决定该如何确认消息。 获得消息传递标识,手动单条确认消息ok,手动单条确认消息reject,并重配送,手动单条确认消息reject,移除不重配送;手动单条确认消息Nack,并重配送

public static void main(String[] args) throws Exception {
		// 1、创建连接工厂
		ConnectionFactory factory = new ConnectionFactory();
		// 2、设置连接属性
		factory.setHost("localhost");

		String queueName = "queue1";

		try (
				// 3、从连接工厂获取连接
				Connection connection = factory.newConnection("消费者1");
				// 4、从链接中创建通道
				Channel channel = connection.createChannel();) {

			// 5、声明(创建)队列 如果队列不存在,才会创建 RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
			channel.queueDeclare(queueName, true, false, false, null);

			System.out.println("开始接收消息");

			boolean autoAck = false;

			while (true) {
				// 从指定队列拉取一条消息
				GetResponse gr = channel.basicGet(queueName, autoAck);

				if (gr == null) {// 未取到消息
					System.out.println("队列上没有消息");
				} else { // 取到消息
							// 进行消息处理,然后根据处理结果决定该如何确认消息。
					System.out.println("取得消息:" + new String(gr.getBody(), "UTF-8"));
					System.out.println("消息的属性有:" + gr.getProps());
					System.out.println("队列上的消息数量有:" + gr.getMessageCount());

					// 获得消息传递标识
					long deliveryTag = gr.getEnvelope().getDeliveryTag();
					boolean multiple = false; // 是否批量
					boolean requeue = true; // 是否重配送
					// 手动单条确认消息ok
					// channel.basicAck(deliveryTag, false);
					// 手动单条确认消息reject,并重配送
					// channel.basicReject(deliveryTag, requeue);
					// 手动单条确认消息reject,移除不重配送
					// channel.basicReject(deliveryTag, false);
					// 手动单条确认消息Nack,并重配送
					// channel.basicNack(deliveryTag, multiple, requeue);
				}
				TimeUnit.SECONDS.sleep(2L);
			}

		}
	}

以上是关于RabbitMQ-AMQP模型详解二的主要内容,如果未能解决你的问题,请参考以下文章

验证码是怎么被机器识别的?Keras+CNN模型验证码识别详解

OpenGL基础学习之二代码基本结构

为什么二代测序的原始数据中会出现Read重复现象?

二代身份证号码编码规则

染色体基因芯片分析和第二代测序应用的区别

阿里云服务器AMD二代计算型c7a/通用型g7a/内存型r7a详解