Spring AMQP杂记之AMQP基本概念

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring AMQP杂记之AMQP基本概念相关的知识,希望对你有一定的参考价值。

参考技术A 接下来步入正题:AMQP本质是一个消息队列协议而并非一个框架,我们熟知的RabbitMQ,RocketMQ才是实现了AMQP的消息队列框架。Spring默认的AMQP实现也是唯一的实现是RabbitMQ。至于阿里的火箭MQ,apache的activeMQ就没有提供实现啦。至于以上这些MQ框架孰优孰劣,要根据实际情况自己判断了。本人更偏向于使用RabbitMQ,一个因为是Spring对其进行了封装,使用起来比较简单。。。(最主要的原因),二是因为本身他是Erlang语言编写,对于并发的支持较好,而且网上的各家MQ的性能测试对比中表现的也比较好。

首先介绍AMQP中几个重要的概念:下面这张图片是RabbitMQ官网中对于AMQP模型的抽象

如图,整个消息流转的过程是:生产者将消息发送到Exchange中,Exchange再将消息根据一定的规则路由到指定的队列中,最后消费者从指定的队列中获取消息。其中被方框圈起来的组件可以称为Broker,即MQ服务器,Publisher和Consumer就是我们自己实现的生产者和消费者。接下来详细介绍一下上面的几个组件。

Exchange(交换机)类型:他是生产者发送消息的地方,交换机收到消息后将消息路由到0或多个队列中,AMQP一共定义了4种交换机,如图:

其实还有一个Default Exchange,只不过它是被MQ服务器提前声明的一种没有名字的Direct exchange。但是它有一个很重要的属性,创建的每一个队列都会自动的用队列名作为路由键(路由key路由的规则)绑定到该exchange中。打个比方,比如你创建了一个名称为“talk-is-cheap——show-me-the-code”的队列,MQ会自动的用“talk-is-cheap——show-me-the-code”作为路由key绑定到Default exchange,所以当生产者发送的消息携带名为“talk-is-cheap——show-me-the-code”的路由key时就会发送到声明的队列中去了。

Direct exchange:

他是根据路由key将消息发送到指定队列,比较适合一对一的场景。工作流程如下:声明队列时指定一个路由key绑定到exchange,当有携带相同key的消息到达exchange中时就会自动的转发给该队列了。

Fanout  exchange:

如图,该交换机是将消息转发到所有绑定到该交换机上的队列,这个时候如果你发给交换机的消息携带路由key,他也会忽略掉。这个场景最适合的就是广播了,类似村书记喇叭一喊,大家都能听到了。这里贴几个官网对于该交换机几个使用场景的介绍。

Topic Exchange:

他通过一定的路由策略将符合条件的路由key转发到多个符合路由策略的队列上,和直连交换机类似,只不过他匹配的是key的模式,类似于正则匹配。其中*代表任意的一个词,#代表0或多个词。使用场景比如这个任务队列里的消息需要多个消费者处理,而每个消费者只能处理特定的任务。

Headers Exchange:

类似于http的head,可以设置多个属性,根据属性进行队列匹配。首部交换机也会忽略路由键。当有多个首部的时候怎么知道该匹配哪一个呢?交换机有一个“x-match”参数,设置为any表示任意一个匹配即可,设置为all则表示所有的头部信息都匹配才会发送到指定队列。这个交换机工作中不咋用,好像略微有点鸡肋。

呃,这个其实和普通的队列没啥区别,只不过它里面可以设置一些额外的属性,例如持久化、自动删除等。有一个地方需要注意,如果不能将AMQP消息路由到任何队列(例如,路由键没有对应的队列),则可以根据发布者设置的消息属性,将其删除或返回给发布者。这个设置在保证消息可靠性时是十分重要的。(关于如何保证消息可靠性可以看这篇文章 RabbitMQ消息可靠性分析 )

Rabbitmq与spring整合之重要组件介绍——AMQP声明式配置&RabbitTemplate组件

上一节是使用rabbitAdmin的管理组件进行声明队列,交换器,绑定等操作,本节则是采用AMQP声明式配置来声明这些东西。AMQP声明主要是通过@Bean注解进行的。

配置:

 1 package com.zxy.demo.config;
 2 
 3 import org.springframework.amqp.core.Binding;
 4 import org.springframework.amqp.core.BindingBuilder;
 5 import org.springframework.amqp.core.DirectExchange;
 6 import org.springframework.amqp.core.FanoutExchange;
 7 import org.springframework.amqp.core.Queue;
 8 import org.springframework.amqp.core.TopicExchange;
 9 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
10 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11 import org.springframework.amqp.rabbit.core.RabbitAdmin;
12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
13 import org.springframework.context.annotation.Bean;
14 import org.springframework.context.annotation.ComponentScan;
15 import org.springframework.context.annotation.Configuration;
16 
17 
18 @Configuration
19 @ComponentScan(basePackages= {"com.zxy.demo.*"})
20 public class RabbitmqCofing {
21 //    注入连接工厂,spring的配置,springboot可以配置在属性文件中
22     @Bean
23     public ConnectionFactory connectionFactory() {
24         CachingConnectionFactory connection = new CachingConnectionFactory();
25         connection.setAddresses("192.168.10.110:5672");
26         connection.setUsername("guest");
27         connection.setPassword("guest");
28         connection.setVirtualHost("/");
29         return connection;
30     }
31 //    配置RabbitAdmin来管理rabbit
32     @Bean
33     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
34         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
35         //用RabbitAdmin一定要配置这个,spring加载的是后就会加载这个类================特别重要
36         rabbitAdmin.setAutoStartup(true);
37         return rabbitAdmin;
38     }
39 //===========================以上结合测试rabbitAdmin部分===========================================================    
40     
41     
42     
43 //===========================以下为AMQP配置队列绑定等,spring容器加载时候就能够注入===========================================================    
44 //    采用AMQP定义队列、交换器、绑定等
45     @Bean(name="direct.queue01")
46     public Queue queue001() {
47         return new Queue("direct.queue01", true, false, false);
48     }
49     @Bean(name="test.direct01")
50     public DirectExchange directExchange() {
51         return new DirectExchange("test.direct01", true, false, null);
52     }
53     @Bean
54     public Binding bind001() {
55         return BindingBuilder.bind(queue001()).to(directExchange()).with("mq.#");
56     }
57     @Bean(name="topic.queue01")
58     public Queue queue002() {
59         return new Queue("topic.queue01", true, false, false);
60     }
61     @Bean(name="test.topic01")
62     public TopicExchange topicExchange() {
63         return new TopicExchange("test.topic01", true, false, null);
64     }
65     @Bean
66     public Binding bind002() {
67         return BindingBuilder.bind(queue002()).to(topicExchange()).with("mq.topic");
68     }
69     @Bean(name="fanout.queue01")
70     public Queue queue003() {
71         return new Queue("fanout.queue", true, false, false);
72     }
73     @Bean(name="test.fanout01")
74     public FanoutExchange fanoutExchange() {
75         return new FanoutExchange("test.fanout01", true, false, null);
76     }
77     @Bean
78     public Binding bind003() {
79         return BindingBuilder.bind(queue003()).to(fanoutExchange());
80     }
81     
82     
83     
84 //===========================注入rabbitTemplate组件===========================================================    
85 //    跟spring整合注入改模板,跟springboot整合的话只需要在配置文件中配置即可
86     @Bean 
87     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
88         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
89         return rabbitTemplate;
90     }
91 }

单元测试:

 1 package com.zxy.demo;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.amqp.AmqpException;
 6 import org.springframework.amqp.core.Binding;
 7 import org.springframework.amqp.core.BindingBuilder;
 8 import org.springframework.amqp.core.DirectExchange;
 9 import org.springframework.amqp.core.FanoutExchange;
10 import org.springframework.amqp.core.Message;
11 import org.springframework.amqp.core.MessageDeliveryMode;
12 import org.springframework.amqp.core.MessagePostProcessor;
13 import org.springframework.amqp.core.MessageProperties;
14 import org.springframework.amqp.core.Queue;
15 import org.springframework.amqp.core.TopicExchange;
16 import org.springframework.amqp.rabbit.core.RabbitAdmin;
17 import org.springframework.amqp.rabbit.core.RabbitTemplate;
18 import org.springframework.beans.factory.annotation.Autowired;
19 import org.springframework.boot.test.context.SpringBootTest;
20 import org.springframework.test.context.junit4.SpringRunner;
21 
22 
23 @RunWith(SpringRunner.class)
24 @SpringBootTest
25 public class ForwardApplicationTests {
26 
27     @Test
28     public void contextLoads() {
29     }
30     @Autowired
31     private RabbitAdmin rabbitAdmin;
32     @Test
33     public void testAdmin() {
34 //        切记命名不能重复复
35         rabbitAdmin.declareQueue(new Queue("test.direct.queue"));
36         rabbitAdmin.declareExchange(new DirectExchange("test.direct"));
37         rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "mq.direct", null));
38 
39         rabbitAdmin.declareQueue(new Queue("test.topic.queue", true,false, false));
40         rabbitAdmin.declareExchange(new TopicExchange("test.topic", true,false));
41 //        如果注释掉上面两句实现声明,直接进行下面的绑定竟然不行,该版本amqp-client采用的是5.1.2,将上面两行代码放开,则运行成功
42         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", true,false, false))
43                 .to(new TopicExchange("test.topic", true,false)).with("mq.topic"));
44 //        经过实验确实是需要先声明,才可以运行通过
45         rabbitAdmin.declareQueue(new Queue("test.fanout.queue",true,false,false,null));
46         rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", true, false, null));
47         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", true, false,false))
48                 .to(new FanoutExchange("test.fanout", true, false)));
49         rabbitAdmin.purgeQueue("test.direct.queue", false);//清空队列消息
50     }
51     @Autowired
52     private RabbitTemplate rabbitTemplate;
53     @Test
54     public void testTemplate() {
55         String body = "hello,test rabbitTemplage!";
56         MessageProperties properties = new MessageProperties();
57         properties.setContentEncoding("utf-8");
58         properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
59         properties.setPriority(1);
60         properties.setHeader("nihao:", "yes!");
61         Message message = new Message(body.getBytes(), properties);
62 //        MessagePostProcessor参数是在消息发送过程中动态修改消息属性的类
63         rabbitTemplate.convertAndSend("test.direct01", "mq.direct", message,new MessagePostProcessor() {
64             
65             @Override
66             public Message postProcessMessage(Message message) throws AmqpException {
67 //                修改属性
68                 message.getMessageProperties().setHeader("nihao:", "no");
69 //                添加属性
70                 message.getMessageProperties().setHeader("新添加属性:", "添加属性1");
71                 return message;
72             }
73         });
74         
75         
76 //        发送objcet类型
77         rabbitTemplate.convertAndSend("test.topic01", "mq.topic", "send object type message!!!");
78         System.out.println("发送完毕!!!");
79     }
80 
81 }

 

以上是关于Spring AMQP杂记之AMQP基本概念的主要内容,如果未能解决你的问题,请参考以下文章

Spring AMQP项目

RabbitMQ与AMQP协议

RabbitMQ 核心概念及与 Spring Boot 2 的整合

消息中间件系列二:RabbitMQ入门(基本概念RabbitMQ的安装和运行)

Spring消息之AMQP.

Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ