Spring Boot异步消息之AMQP讲解及实战(附源码)
Posted showswoller
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot异步消息之AMQP讲解及实战(附源码)相关的知识,希望对你有一定的参考价值。
觉得有帮助请点赞关注收藏~~~
AMQP(高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可 传递消息,并不受客户端/中间件的不同产品,不同开发语言等条件的限制。
下面实现主要用RabbitMQ讲解AMQP实例,因此需要事先安装RabbitMQ和erlang语言
erlang下载地址 https://www.erlang.org/downloads
RabbitMQ下载地址 https://www.rabbitmq.com/download.html
使用RabbitMQ实现发布/订阅异步消息模式
1:创建发布者应用ch8_2Sender
2:在pom.xml文件中添加依赖
<?xml version="1.0" encoding="UTF-8"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
-<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/>
<!-- lookup parent from repository -->
</parent>
<groupId>com.ch</groupId>
<artifactId>ch8_2Sender</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ch8_2Sender</name>
<description>Demo project for Spring Boot</description>
+<properties>
-<dependencies>
-<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
+<dependency>
-<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
-<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
-<build>
-<plugins>
-<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3:创建Weather实体类
package com.ch.ch8_2Sender.entity;
import java.io.Serializable;
public class Weather implements Serializable
private static final long serialVersionUID = -8221467966772683998L;
private String id;
private String city;
private String weatherDetail;
public String getCity()
return city;
public void setCity(String city)
this.city = city;
public String getWeatherDetail()
return weatherDetail;
public void setWeatherDetail(String weatherDetail)
this.weatherDetail = weatherDetail;
public String getId()
return id;
public void setId(String id)
this.id = id;
@Override
public String toString()
return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]";
4:重写Ch82SenderApplication主类
package com.ch.ch8_2Sender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.ch.ch8_2Sender.entity.Weather;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringBootApplication
public class Ch82SenderApplication implements CommandLineRunner
@Autowired
private ObjectMapper objectMapper;
@Autowired
RabbitTemplate rabbitTemplate;
public static void main(String[] args)
SpringApplication.run(Ch82SenderApplication.class, args);
/**
* 定义发布者
*/
@Override
public void run(String... args) throws Exception
//定义消息对象
Weather weather = new Weather();
weather.setId("010");
weather.setCity("北京");
weather.setWeatherDetail("今天晴到多云,南风5-6级,温度19-26°C");
//指定Json转换器,Jackson2JsonMessageConverter默认将消息转换成byte[]类型的消息
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//objectMapper将weather对象转换为JSON字节数组
Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(weather))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
// 消息唯一ID
CorrelationData correlationData = new CorrelationData(weather.getId());
//使用已封装好的convertAndSend(String exchange , String routingKey , Object message, CorrelationData correlationData)
//将特定的路由key发送消息到指定的交换机
rabbitTemplate.convertAndSend(
"weather-exchange", //分发消息的交换机名称
"weather.message", //用来匹配消息的路由Key
msg, //消息体
correlationData);
5:创建订阅者应用ch8_2Receiver-1
package com.ch.ch8_2Receiver1;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* 定义订阅者Receiver1
*/
@Component
public class Receiver1
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(
bindings =
@QueueBinding(
//队列名weather-queue1保证和别的订阅者不一样
value = @Queue(value = "weather-queue1",durable = "true"),
//weather-exchange与发布者的交换机名相同
exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
//weather.message与发布者的消息的路由Key相同
key = "weather.message"
)
)
@RabbitHandler
public void receiveWeather(@Payload byte[] weatherMessage)throws Exception
System.out.println("-----------订阅者Receiver1接收到消息--------");
//将JSON字节数组转换为Weather对象
Weather w=objectMapper.readValue(weatherMessage, Weather.class);
System.out.println("Receiver1收到的消息内容:"+w);
6:创建订阅者应用ch8_2Receiver-2
package com.ch.ch8_2Receiver1;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* 定义订阅者Receiver2
*/
@Component
public class Receiver2
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(
bindings =
@QueueBinding(
//队列名weather-queue2保证和别的订阅者不一样
value = @Queue(value = "weather-queue2",durable = "true"),
//weather-exchange与发布者的交换机名相同
exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"),
//weather.message与发布者的消息的路由Key相同
key = "weather.message"
)
)
@RabbitHandler
public void receiveWeather(@Payload byte[] weatherMessage)throws Exception
System.out.println("-----------订阅者Receiver2接收到消息--------");
Weather w=objectMapper.readValue(weatherMessage, Weather.class);
//将JSON字节数组转换为Weather对象
System.out.println("Receiver2收到的消息内容:"+w);
接下来分别运行发布者和订阅者的主类即可,发现一个发布者发布的消息可以被多个订阅者订阅。
以上是关于Spring Boot异步消息之AMQP讲解及实战(附源码)的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot 之Spring Boot Starter依赖包及作用(自己还没有看)
Sping Boot入门到实战之实战篇:实现自定义Spring Boot Starter——阿里云消息队列服务Starter