RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)
Posted 嘉禾嘉宁papa
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)相关的知识,希望对你有一定的参考价值。
目录
一、简介
消息中间件具有一系列功能如低耦合、可靠投递、广播、流量控制、最终一致性等,成为异步RPC的主要手段之一,常见的ActiveMQ、RabbitMQ、Kafka、RocketMQ等。消息中间件主要作用如下:
- 异步处理
- 应用解耦
- 流量削峰
- 日志处理
RabbitMQ默认容器是simple容器,从2.0版本之后就多了一个容器direct容器,我们从分布式架构的角度一起来看看到底有什么不同吧。本文主要用使用Spring Boot(2.5.2)来整合RabbitMQ(2.5.2),使用simple容器实现一个消费者。本文的前提是有一个安装好的RabbitMQ的环境。
1.1 本文中注解说明
- @Configuration:用于定义配置类,被注解的类可以包含有一个或多个被@Bean注解的方法,这些方法将会被AnnotationConfigApplicationContext或AnnotationConfigWebApplicationContext类进行扫描,并用于构建bean定义,初始化Spring容器
- @RabbitListener:@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。 @RabbitListener标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理
- @RabbitHandler: 需配合 @RabbitListener注解一起使用,当收到消息后,根据 MessageConverter 转换后的参数类型调用相关的方法
二、Maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/>
</parent>
<groupId>com.alian</groupId>
<artifactId>rabbitmq-simple</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-simple</name>
<description>SpringBoot整合RabbitMQ之simple容器(消费者)</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${parent.version}</version>
</dependency>
<!--rabbitMq的版本 版本最好和springboot保持一致-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--用于序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10</version>
</dependency>
<!--java 8时间序列化-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.9.10</version>
</dependency>
<!--自己打包上传到私服的,用于测试-->
<dependency>
<groupId>com.alian</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
这里需要注意的是下面这个包,是我本人打包到私服的,其实一个员工类,支付类,加上一个常量类,大家也自己打包自己的实体到私服,或者直接通过模块开发的方式实现我这个实例。
<dependency>
<groupId>com.alian</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
三、配置类
3.1 基础配置
本文开始说了,会从分布式架构的角度来完成本次整合。我们会用两个系统消费者和生产者来完成本次的测试,采用这种麻烦的方式,不是不会单元测试或者模块开发,而是为了避免大家踩坑,这样也比较贴近我们实际开发。所以我提前准备了几个简单的类(MQConstants.java、Employee.java、PayRecord.java)打成一个jar包到maven私服,一个配置类存放MQ常量,一个员工类,一个支付类,里面的属性都是常用类型,具体如下。
MQConstants .java
package com.alian.common.constant;
public class MQConstants {
/**
* 交换机
*/
public final static String ALIAN_EXCHANGE_NAME = "ALIAN_EXCHANGE";
/**
* 队列名
*/
public final static String ALIAN_QUEUE_NAME = "ALIAN_QUEUE";
/**
* 路由key
*/
public final static String ALIAN_ROUTINGKEY_NAME = "ALIAN_ROUTINGKEY";
}
Employee.java
package com.alian.common.dto;
import java.io.Serializable;
import java.time.LocalDate;
import java.util.Objects;
public class Employee implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 员工编号
*/
private String id = "";
/**
* 员工姓名
*/
private String name = "";
/**
* 员工年龄
*/
private int age;
/**
* 工资
*/
private double salary = 0.00;
/**
* 部门
*/
private String department = "";
/**
* 入职时间
*/
private LocalDate hireDate = LocalDate.of(1970, 1, 1);
/**
* 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
*/
public Employee() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public double getSalary() {
return salary;
}
public void setSalary(double salary) {
this.salary = salary;
}
public String getDepartment() {
return department;
}
public void setDepartment(String department) {
this.department = department;
}
public LocalDate getHireDate() {
return hireDate;
}
public void setHireDate(LocalDate hireDate) {
this.hireDate = hireDate;
}
@Override
public String toString() {
return "Employee{" +
"id='" + id + '\\'' +
", name='" + name + '\\'' +
", age=" + age +
", salary=" + salary +
", department='" + department + '\\'' +
", hireDate=" + hireDate +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Employee)) return false;
Employee employee = (Employee) o;
return getAge() == employee.getAge() &&
Double.compare(employee.getSalary(), getSalary()) == 0 &&
Objects.equals(getId(), employee.getId()) &&
Objects.equals(getName(), employee.getName()) &&
Objects.equals(getDepartment(), employee.getDepartment()) &&
Objects.equals(getHireDate(), employee.getHireDate());
}
@Override
public int hashCode() {
return Objects.hash(getId(), getName(), getAge(), getSalary(), getDepartment(), getHireDate());
}
}
PayRecord.java
package com.alian.common.dto;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Objects;
public class PayRecord implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 支付流水
*/
private String payTranSeq = "";
/**
* 支付金额(单位分)
*/
private int payAmount;
/**
* 支付方式(00:现金,01:微信,02:支付宝,03:银联,04:其他)
*/
private String payType = "01";
/**
* 支付状态(00:支付成功,01:待支付,02:支付失败,03:已取消)
*/
private String status = "01";
/**
* 支付时间
*/
private LocalDateTime payTime = LocalDateTime.now();
/**
* 第三方流水
*/
private String payNo = "";
/**
* 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
*/
public PayRecord() {
}
public String getPayTranSeq() {
return payTranSeq;
}
public void setPayTranSeq(String payTranSeq) {
this.payTranSeq = payTranSeq;
}
public int getPayAmount() {
return payAmount;
}
public void setPayAmount(int payAmount) {
this.payAmount = payAmount;
}
public String getPayType() {
return payType;
}
public void setPayType(String payType) {
this.payType = payType;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public LocalDateTime getPayTime() {
return payTime;
}
public void setPayTime(LocalDateTime payTime) {
this.payTime = payTime;
}
public String getPayNo() {
return payNo;
}
public void setPayNo(String payNo) {
this.payNo = payNo;
}
@Override
public String toString() {
return "PayRecord{" +
"payTranSeq='" + payTranSeq + '\\'' +
", payAmount=" + payAmount +
", payType='" + payType + '\\'' +
", status='" + status + '\\'' +
", payTime=" + payTime +
", payNo='" + payNo + '\\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PayRecord)) return false;
PayRecord payRecord = (PayRecord) o;
return getPayAmount() == payRecord.getPayAmount() &&
Objects.equals(getPayTranSeq(), payRecord.getPayTranSeq()) &&
Objects.equals(getPayType(), payRecord.getPayType()) &&
Objects.equals(getStatus(), payRecord.getStatus()) &&
Objects.equals(getPayTime(), payRecord.getPayTime()) &&
Objects.equals(getPayNo(), payRecord.getPayNo());
}
@Override
public int hashCode() {
return Objects.hash(getPayTranSeq(), getPayAmount(), getPayType(), getStatus(), getPayTime(), getPayNo());
}
}
3.2 交换机、路由、队列配置
具体的解释,我相信代码里说得很清楚了。com.alian.common.constant.MQConstants是我公共包里的,具体的代码在上面的MQConstants.java
ExchangeConfig.java
package com.alian.rabbitmq.config;
import com.alian.common.constant.MQConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import orgRabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)
springboot 整合 rabbitmq 转载https://www.cnblogs.com/hlhdidi/p/6535677.html