Spring AMQP杂记之AMQP基本概念
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring AMQP杂记之AMQP基本概念相关的知识,希望对你有一定的参考价值。
参考技术A 接下来步入正题:AMQP本质是一个消息队列协议而并非一个框架,我们熟知的RabbitMQ,RocketMQ才是实现了AMQP的消息队列框架。Spring默认的AMQP实现也是唯一的实现是RabbitMQ。至于阿里的火箭MQ,apache的activeMQ就没有提供实现啦。至于以上这些MQ框架孰优孰劣,要根据实际情况自己判断了。本人更偏向于使用RabbitMQ,一个因为是Spring对其进行了封装,使用起来比较简单。。。(最主要的原因),二是因为本身他是Erlang语言编写,对于并发的支持较好,而且网上的各家MQ的性能测试对比中表现的也比较好。首先介绍AMQP中几个重要的概念:下面这张图片是RabbitMQ官网中对于AMQP模型的抽象
如图,整个消息流转的过程是:生产者将消息发送到Exchange中,Exchange再将消息根据一定的规则路由到指定的队列中,最后消费者从指定的队列中获取消息。其中被方框圈起来的组件可以称为Broker,即MQ服务器,Publisher和Consumer就是我们自己实现的生产者和消费者。接下来详细介绍一下上面的几个组件。
Exchange(交换机)类型:他是生产者发送消息的地方,交换机收到消息后将消息路由到0或多个队列中,AMQP一共定义了4种交换机,如图:
其实还有一个Default Exchange,只不过它是被MQ服务器提前声明的一种没有名字的Direct exchange。但是它有一个很重要的属性,创建的每一个队列都会自动的用队列名作为路由键(路由key路由的规则)绑定到该exchange中。打个比方,比如你创建了一个名称为“talk-is-cheap——show-me-the-code”的队列,MQ会自动的用“talk-is-cheap——show-me-the-code”作为路由key绑定到Default exchange,所以当生产者发送的消息携带名为“talk-is-cheap——show-me-the-code”的路由key时就会发送到声明的队列中去了。
Direct exchange:
他是根据路由key将消息发送到指定队列,比较适合一对一的场景。工作流程如下:声明队列时指定一个路由key绑定到exchange,当有携带相同key的消息到达exchange中时就会自动的转发给该队列了。
Fanout exchange:
如图,该交换机是将消息转发到所有绑定到该交换机上的队列,这个时候如果你发给交换机的消息携带路由key,他也会忽略掉。这个场景最适合的就是广播了,类似村书记喇叭一喊,大家都能听到了。这里贴几个官网对于该交换机几个使用场景的介绍。
Topic Exchange:
他通过一定的路由策略将符合条件的路由key转发到多个符合路由策略的队列上,和直连交换机类似,只不过他匹配的是key的模式,类似于正则匹配。其中*代表任意的一个词,#代表0或多个词。使用场景比如这个任务队列里的消息需要多个消费者处理,而每个消费者只能处理特定的任务。
Headers Exchange:
类似于http的head,可以设置多个属性,根据属性进行队列匹配。首部交换机也会忽略路由键。当有多个首部的时候怎么知道该匹配哪一个呢?交换机有一个“x-match”参数,设置为any表示任意一个匹配即可,设置为all则表示所有的头部信息都匹配才会发送到指定队列。这个交换机工作中不咋用,好像略微有点鸡肋。
呃,这个其实和普通的队列没啥区别,只不过它里面可以设置一些额外的属性,例如持久化、自动删除等。有一个地方需要注意,如果不能将AMQP消息路由到任何队列(例如,路由键没有对应的队列),则可以根据发布者设置的消息属性,将其删除或返回给发布者。这个设置在保证消息可靠性时是十分重要的。(关于如何保证消息可靠性可以看这篇文章 RabbitMQ消息可靠性分析 )
Rabbitmq与spring整合之重要组件介绍——AMQP声明式配置&RabbitTemplate组件
上一节是使用rabbitAdmin的管理组件进行声明队列,交换器,绑定等操作,本节则是采用AMQP声明式配置来声明这些东西。AMQP声明主要是通过@Bean注解进行的。
配置:
1 package com.zxy.demo.config; 2 3 import org.springframework.amqp.core.Binding; 4 import org.springframework.amqp.core.BindingBuilder; 5 import org.springframework.amqp.core.DirectExchange; 6 import org.springframework.amqp.core.FanoutExchange; 7 import org.springframework.amqp.core.Queue; 8 import org.springframework.amqp.core.TopicExchange; 9 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 10 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 11 import org.springframework.amqp.rabbit.core.RabbitAdmin; 12 import org.springframework.amqp.rabbit.core.RabbitTemplate; 13 import org.springframework.context.annotation.Bean; 14 import org.springframework.context.annotation.ComponentScan; 15 import org.springframework.context.annotation.Configuration; 16 17 18 @Configuration 19 @ComponentScan(basePackages= {"com.zxy.demo.*"}) 20 public class RabbitmqCofing { 21 // 注入连接工厂,spring的配置,springboot可以配置在属性文件中 22 @Bean 23 public ConnectionFactory connectionFactory() { 24 CachingConnectionFactory connection = new CachingConnectionFactory(); 25 connection.setAddresses("192.168.10.110:5672"); 26 connection.setUsername("guest"); 27 connection.setPassword("guest"); 28 connection.setVirtualHost("/"); 29 return connection; 30 } 31 // 配置RabbitAdmin来管理rabbit 32 @Bean 33 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { 34 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); 35 //用RabbitAdmin一定要配置这个,spring加载的是后就会加载这个类================特别重要 36 rabbitAdmin.setAutoStartup(true); 37 return rabbitAdmin; 38 } 39 //===========================以上结合测试rabbitAdmin部分=========================================================== 40 41 42 43 //===========================以下为AMQP配置队列绑定等,spring容器加载时候就能够注入=========================================================== 44 // 采用AMQP定义队列、交换器、绑定等 45 @Bean(name="direct.queue01") 46 public Queue queue001() { 47 return new Queue("direct.queue01", true, false, false); 48 } 49 @Bean(name="test.direct01") 50 public DirectExchange directExchange() { 51 return new DirectExchange("test.direct01", true, false, null); 52 } 53 @Bean 54 public Binding bind001() { 55 return BindingBuilder.bind(queue001()).to(directExchange()).with("mq.#"); 56 } 57 @Bean(name="topic.queue01") 58 public Queue queue002() { 59 return new Queue("topic.queue01", true, false, false); 60 } 61 @Bean(name="test.topic01") 62 public TopicExchange topicExchange() { 63 return new TopicExchange("test.topic01", true, false, null); 64 } 65 @Bean 66 public Binding bind002() { 67 return BindingBuilder.bind(queue002()).to(topicExchange()).with("mq.topic"); 68 } 69 @Bean(name="fanout.queue01") 70 public Queue queue003() { 71 return new Queue("fanout.queue", true, false, false); 72 } 73 @Bean(name="test.fanout01") 74 public FanoutExchange fanoutExchange() { 75 return new FanoutExchange("test.fanout01", true, false, null); 76 } 77 @Bean 78 public Binding bind003() { 79 return BindingBuilder.bind(queue003()).to(fanoutExchange()); 80 } 81 82 83 84 //===========================注入rabbitTemplate组件=========================================================== 85 // 跟spring整合注入改模板,跟springboot整合的话只需要在配置文件中配置即可 86 @Bean 87 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 88 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 89 return rabbitTemplate; 90 } 91 }
单元测试:
1 package com.zxy.demo; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.amqp.AmqpException; 6 import org.springframework.amqp.core.Binding; 7 import org.springframework.amqp.core.BindingBuilder; 8 import org.springframework.amqp.core.DirectExchange; 9 import org.springframework.amqp.core.FanoutExchange; 10 import org.springframework.amqp.core.Message; 11 import org.springframework.amqp.core.MessageDeliveryMode; 12 import org.springframework.amqp.core.MessagePostProcessor; 13 import org.springframework.amqp.core.MessageProperties; 14 import org.springframework.amqp.core.Queue; 15 import org.springframework.amqp.core.TopicExchange; 16 import org.springframework.amqp.rabbit.core.RabbitAdmin; 17 import org.springframework.amqp.rabbit.core.RabbitTemplate; 18 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.boot.test.context.SpringBootTest; 20 import org.springframework.test.context.junit4.SpringRunner; 21 22 23 @RunWith(SpringRunner.class) 24 @SpringBootTest 25 public class ForwardApplicationTests { 26 27 @Test 28 public void contextLoads() { 29 } 30 @Autowired 31 private RabbitAdmin rabbitAdmin; 32 @Test 33 public void testAdmin() { 34 // 切记命名不能重复复 35 rabbitAdmin.declareQueue(new Queue("test.direct.queue")); 36 rabbitAdmin.declareExchange(new DirectExchange("test.direct")); 37 rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "mq.direct", null)); 38 39 rabbitAdmin.declareQueue(new Queue("test.topic.queue", true,false, false)); 40 rabbitAdmin.declareExchange(new TopicExchange("test.topic", true,false)); 41 // 如果注释掉上面两句实现声明,直接进行下面的绑定竟然不行,该版本amqp-client采用的是5.1.2,将上面两行代码放开,则运行成功 42 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", true,false, false)) 43 .to(new TopicExchange("test.topic", true,false)).with("mq.topic")); 44 // 经过实验确实是需要先声明,才可以运行通过 45 rabbitAdmin.declareQueue(new Queue("test.fanout.queue",true,false,false,null)); 46 rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", true, false, null)); 47 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", true, false,false)) 48 .to(new FanoutExchange("test.fanout", true, false))); 49 rabbitAdmin.purgeQueue("test.direct.queue", false);//清空队列消息 50 } 51 @Autowired 52 private RabbitTemplate rabbitTemplate; 53 @Test 54 public void testTemplate() { 55 String body = "hello,test rabbitTemplage!"; 56 MessageProperties properties = new MessageProperties(); 57 properties.setContentEncoding("utf-8"); 58 properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 59 properties.setPriority(1); 60 properties.setHeader("nihao:", "yes!"); 61 Message message = new Message(body.getBytes(), properties); 62 // MessagePostProcessor参数是在消息发送过程中动态修改消息属性的类 63 rabbitTemplate.convertAndSend("test.direct01", "mq.direct", message,new MessagePostProcessor() { 64 65 @Override 66 public Message postProcessMessage(Message message) throws AmqpException { 67 // 修改属性 68 message.getMessageProperties().setHeader("nihao:", "no"); 69 // 添加属性 70 message.getMessageProperties().setHeader("新添加属性:", "添加属性1"); 71 return message; 72 } 73 }); 74 75 76 // 发送objcet类型 77 rabbitTemplate.convertAndSend("test.topic01", "mq.topic", "send object type message!!!"); 78 System.out.println("发送完毕!!!"); 79 } 80 81 }
以上是关于Spring AMQP杂记之AMQP基本概念的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ 核心概念及与 Spring Boot 2 的整合