RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)

Posted 嘉禾嘉宁papa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)相关的知识,希望对你有一定的参考价值。

一、简介

  消息中间件具有一系列功能如低耦合、可靠投递、广播、流量控制、最终一致性等,成为异步RPC的主要手段之一,常见的ActiveMQ、RabbitMQ、Kafka、RocketMQ等。消息中间件主要作用如下:

  • 异步处理
  • 应用解耦
  • 流量削峰
  • 日志处理

  RabbitMQ默认容器是simple容器,从2.0版本之后就多了一个容器direct容器,我们从分布式架构的角度一起来看看到底有什么不同吧。本文主要用使用Spring Boot(2.5.2)来整合RabbitMQ(2.5.2),使用simple容器实现一个消费者。本文的前提是有一个安装好的RabbitMQ的环境。

1.1 本文中注解说明

  • @Configuration:用于定义配置类,被注解的类可以包含有一个或多个被@Bean注解的方法,这些方法将会被AnnotationConfigApplicationContextAnnotationConfigWebApplicationContext类进行扫描,并用于构建bean定义,初始化Spring容器
  • @RabbitListener:@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。 @RabbitListener标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理
  • @RabbitHandler: 需配合 @RabbitListener注解一起使用,当收到消息后,根据 MessageConverter 转换后的参数类型调用相关的方法

二、Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/>
    </parent>

    <groupId>com.alian</groupId>
    <artifactId>rabbitmq-simple</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-simple</name>
    <description>SpringBoot整合RabbitMQ之simple容器(消费者)</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <!--rabbitMq的版本 版本最好和springboot保持一致-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--用于序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.10</version>
        </dependency>

        <!--java 8时间序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.9.10</version>
        </dependency>

        <!--自己打包上传到私服的,用于测试-->
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

  这里需要注意的是下面这个包,是我本人打包到私服的,其实一个员工类,支付类,加上一个常量类,大家也自己打包自己的实体到私服,或者直接通过模块开发的方式实现我这个实例。

<dependency>
    <groupId>com.alian</groupId>
    <artifactId>common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

三、配置类

3.1 基础配置

  本文开始说了,会从分布式架构的角度来完成本次整合。我们会用两个系统消费者和生产者来完成本次的测试,采用这种麻烦的方式,不是不会单元测试或者模块开发,而是为了避免大家踩坑,这样也比较贴近我们实际开发。所以我提前准备了几个简单的类(MQConstants.javaEmployee.javaPayRecord.java)打成一个jar包到maven私服,一个配置类存放MQ常量,一个员工类,一个支付类,里面的属性都是常用类型,具体如下。

MQConstants .java

package com.alian.common.constant;

public class MQConstants {

    /**
     * 交换机
     */
    public final static String ALIAN_EXCHANGE_NAME = "ALIAN_EXCHANGE";

    /**
     * 队列名
     */
    public final static String ALIAN_QUEUE_NAME = "ALIAN_QUEUE";

    /**
     * 路由key
     */
    public final static String ALIAN_ROUTINGKEY_NAME = "ALIAN_ROUTINGKEY";
}

Employee.java

package com.alian.common.dto;

import java.io.Serializable;
import java.time.LocalDate;
import java.util.Objects;

public class Employee implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 员工编号
     */
    private String id = "";

    /**
     * 员工姓名
     */
    private String name = "";

    /**
     * 员工年龄
     */
    private int age;

    /**
     * 工资
     */
    private double salary = 0.00;

    /**
     * 部门
     */
    private String department = "";

    /**
     * 入职时间
     */
    private LocalDate hireDate = LocalDate.of(1970, 1, 1);

    /**
     * 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
     */
    public Employee() {

    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public double getSalary() {
        return salary;
    }

    public void setSalary(double salary) {
        this.salary = salary;
    }

    public String getDepartment() {
        return department;
    }

    public void setDepartment(String department) {
        this.department = department;
    }

    public LocalDate getHireDate() {
        return hireDate;
    }

    public void setHireDate(LocalDate hireDate) {
        this.hireDate = hireDate;
    }

    @Override
    public String toString() {
        return "Employee{" +
                "id='" + id + '\\'' +
                ", name='" + name + '\\'' +
                ", age=" + age +
                ", salary=" + salary +
                ", department='" + department + '\\'' +
                ", hireDate=" + hireDate +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Employee)) return false;
        Employee employee = (Employee) o;
        return getAge() == employee.getAge() &&
                Double.compare(employee.getSalary(), getSalary()) == 0 &&
                Objects.equals(getId(), employee.getId()) &&
                Objects.equals(getName(), employee.getName()) &&
                Objects.equals(getDepartment(), employee.getDepartment()) &&
                Objects.equals(getHireDate(), employee.getHireDate());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getId(), getName(), getAge(), getSalary(), getDepartment(), getHireDate());
    }
}

PayRecord.java

package com.alian.common.dto;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Objects;

public class PayRecord implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 支付流水
     */
    private String payTranSeq = "";

    /**
     * 支付金额(单位分)
     */
    private int payAmount;

    /**
     * 支付方式(00:现金,01:微信,02:支付宝,03:银联,04:其他)
     */
    private String payType = "01";

    /**
     * 支付状态(00:支付成功,01:待支付,02:支付失败,03:已取消)
     */
    private String status = "01";

    /**
     * 支付时间
     */
    private LocalDateTime payTime = LocalDateTime.now();

    /**
     * 第三方流水
     */
    private String payNo = "";

    /**
     * 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
     */
    public PayRecord() {

    }

    public String getPayTranSeq() {
        return payTranSeq;
    }

    public void setPayTranSeq(String payTranSeq) {
        this.payTranSeq = payTranSeq;
    }

    public int getPayAmount() {
        return payAmount;
    }

    public void setPayAmount(int payAmount) {
        this.payAmount = payAmount;
    }

    public String getPayType() {
        return payType;
    }

    public void setPayType(String payType) {
        this.payType = payType;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public LocalDateTime getPayTime() {
        return payTime;
    }

    public void setPayTime(LocalDateTime payTime) {
        this.payTime = payTime;
    }

    public String getPayNo() {
        return payNo;
    }

    public void setPayNo(String payNo) {
        this.payNo = payNo;
    }

    @Override
    public String toString() {
        return "PayRecord{" +
                "payTranSeq='" + payTranSeq + '\\'' +
                ", payAmount=" + payAmount +
                ", payType='" + payType + '\\'' +
                ", status='" + status + '\\'' +
                ", payTime=" + payTime +
                ", payNo='" + payNo + '\\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof PayRecord)) return false;
        PayRecord payRecord = (PayRecord) o;
        return getPayAmount() == payRecord.getPayAmount() &&
                Objects.equals(getPayTranSeq(), payRecord.getPayTranSeq()) &&
                Objects.equals(getPayType(), payRecord.getPayType()) &&
                Objects.equals(getStatus(), payRecord.getStatus()) &&
                Objects.equals(getPayTime(), payRecord.getPayTime()) &&
                Objects.equals(getPayNo(), payRecord.getPayNo());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getPayTranSeq(), getPayAmount(), getPayType(), getStatus(), getPayTime(), getPayNo());
    }
}

3.2 交换机、路由、队列配置

  具体的解释,我相信代码里说得很清楚了。com.alian.common.constant.MQConstants是我公共包里的,具体的代码在上面的MQConstants.java

ExchangeConfig.java

package com.alian.rabbitmq.config;

import com.alian.common.constant.MQConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import orgRabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)

springboot 整合 rabbitmq 转载https://www.cnblogs.com/hlhdidi/p/6535677.html

SpringBoot系列5SpringBoot整合RabbitMQ

RabbitMQ整合SpringBoot

RabbitMQ整合SpringBoot

RabbitMQ——springboot整合RabbitMQ