SpringBoot整合RabbitMQ

Posted 勤奋的木比白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ相关的知识,希望对你有一定的参考价值。

MQ

  • 消息可靠性
  • 消息幂等性
  • MQ的高可用

基本概念

MQ,Message Queue消息队列,是消息传输过程中保存消息的容器,多用于分布式系统之间进行通信。

  • MQ,消息队列,存储消息的中间件
  • 分布式系统通信方式:直接远程调用 、借助第三方完成间接通信
  • 发送方为生产者、接收方为消费者

优劣

优势

  • 应用解耦:提高系统容错性与可维护性
  • 异步提速:提高用户体验与系统吞吐量
  • 消费填谷:提高系统稳定性

劣势

  • 系统复杂度提高
  • 引入rabbitmq产生的新问题,如:网络通信、数据一致性、幂等性等

使用条件

  • 生产者无需从消费者处得到反馈
  • 允许数据短暂的不一致性
  • 引入MQ效益明显高于不引入时的效益

常见产品

  • RabbitMQ:延迟最低
  • AcitveMQ:老牌MQ,性能最低
  • RocketMQ:高吞吐量,高并发、分布式MQ、金融
  • Kafka:高吞吐量,大数据方面

AMQP协议

RabbitMQ

  • Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker

  • Virtual host:处于多租户和安全因素设计,把AMQP的基本组件分到一个虚拟的分组中,类似与网络中的namespace概念(VCP,专有网络),当多个不用用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等

  • Connection:publisher/consumer和broker之间的TCP连接,每个Connection中有多个channel(类似连接池,避免重复创建channel损耗性能)

  • Channel:Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,QMQP method包含了channel id帮助客户端和message broker识别channel,所有channel之间是完全隔离的。Channel作为轻量级Connection极大减少了操作系统建立TCP connection的开销

  • Exchange:交换机,message到达broker的第一站,根据分发规则,匹配表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe),fanout(multicast)

  • Queue:消息最终被送到这里等待consumer取走

  • Binding:exchange和queue之间的虚拟连接,binding中可以用包含routing key。Binding信息被保存到exchange的查询表中,用于message的分发依据

JMS

  • JMS,Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API.
  • JMS是JavaEE规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ. RabbitMQ官方没有提供JMS的实现包,但是开源社区有提供.

工作模式(Java客户端版)

简单模式

生产者

<dependencies>
    <!--rabbitmq java客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
</dependencies>
/**
 * 发送消息
 */
public class MessageProducer 
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException 
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置参数
        factory.setHost("43.139.51.247"); // ip 默认值:localhost
        factory.setPort(5672); // 端口 默认值:5672
        factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
        factory.setUsername("czk");
        factory.setPassword("czk");
        // 3.创建连接Connection
        Connection connection = factory.newConnection();
        // 4.创建Channel
        Channel channel = connection.createChannel();
        // 5.创建队列Queue
        /**
         * public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
         * durable:是否持久化
         * exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
         * autoDelete:是否自动删除,没有Consumer时自动删除
         * arguments:参数
         */
        channel.queueDeclare("hello_world",true,false,false,null);
        // 6.发送消息
        /**
         * public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机名称。简单模式下交换机默认为 ""
         * routingKey:路由名称。注意:使用默认交换机,则routingKey要与队列名一致才能正常路由
         * props:配置信息
         * body:消息数据
         */

        String body = "hello rabbitmq~~~";
        channel.basicPublish("","hello_world",null,body.getBytes());

        // 7.释放资源
        Thread.sleep(30000);
        System.out.println("end...");
        channel.close();
        connection.close();
    

消费者

<dependencies>
    <!--rabbitmq java客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
</dependencies>
/**
 * 接收/消费消息
 */
public class MessageConsumer 
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException 
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置参数
        factory.setHost("43.139.51.247"); // ip 默认值:localhost
        factory.setPort(5672); // 端口 默认值:5672
        factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
        factory.setUsername("czk");
        factory.setPassword("czk");
        // 3.创建连接Connection
        Connection connection = factory.newConnection();
        // 4.创建Channel
        Channel channel = connection.createChannel();
        // 5.创建队列Queue
        /**
         * public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
         * durable:是否持久化
         * exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
         * autoDelete:是否自动删除,没有Consumer时自动删除
         * arguments:参数
         */
        channel.queueDeclare("hello_world",true,false,false,null);
        // 6.接收消息
        /**
         * public String basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:收到消息是否自动确认,消息丢失相关
         * callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel)
            /**
             * 回调方法
             * consumerTag:标识
             * envelope:获取一些消息,交换机、路由key
             * properties:配置消息
             * body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("------------------------------------");
            
        ;
        channel.basicConsume("hello_world",true,consumer);
        // 监听程序,无需关闭资源
    

工作队列模式

生产者

<dependencies>
    <!--rabbitmq java客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-scala-api</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
/**
 * 发送消息
 */
public class MessageProducer_WorkQueue 
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException 
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置参数
        factory.setHost("43.139.51.247"); // ip 默认值:localhost
        factory.setPort(5672); // 端口 默认值:5672
        factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
        factory.setUsername("czk");
        factory.setPassword("czk");
        // 3.创建连接Connection
        Connection connection = factory.newConnection();
        // 4.创建Channel
        Channel channel = connection.createChannel();
        // 5.创建队列Queue
        /**
         * public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
         * durable:是否持久化
         * exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
         * autoDelete:是否自动删除,没有Consumer时自动删除
         * arguments:参数
         */
        channel.queueDeclare("work_queue",true,false,false,null);
        // 6.发送消息
        /**
         * public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机名称。简单模式下交换机默认为 ""
         * routingKey:路由名称。注意:使用默认交换机,则routingKey要与队列名一致才能正常路由
         * props:配置信息
         * body:消息数据
         */
        for (int i = 0;i<10;i++)
            String body = "hello rabbitmq : "+i;
            channel.basicPublish("","work_queue",null,body.getBytes());
        
        // 7.释放资源
        Thread.sleep(30000);
        System.out.println("end...");
        channel.close();
        connection.close();
    

消费者

<dependencies>
    <!--rabbitmq java客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-scala-api</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
/**
 * 接收/消费消息
 */
public class MessageConsumer_WorkQueue1 
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException 
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置参数
        factory.setHost("43.139.51.247"); // ip 默认值:localhost
        factory.setPort(5672); // 端口 默认值:5672
        factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
        factory.setUsername("czk");
        factory.setPassword("czk");
        // 3.创建连接Connection
        Connection connection = factory.newConnection();
        // 4.创建Channel
        Channel channel = connection.createChannel();
        // 5.创建队列Queue
        /**
         * public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
         * durable:是否持久化
         * exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
         * autoDelete:是否自动删除,没有Consumer时自动删除
         * arguments:参数
         */
        channel.queueDeclare("work_queue",true,false,false,null);
        // 6.接收消息
        /**
         * public String basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:收到消息是否自动确认,消息丢失相关
         * callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel)
            /**
             * 回调方法
             * consumerTag:标识
             * envelope:获取一些消息,交换机、路由key
             * properties:配置消息
             * body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("------------------------------------");
            
        ;
        channel.basicConsume("work_queue",true,consumer);
        // 监听程序,无需关闭资源
    

/**
 * 接收/消费消息
 */
public class MessageConsumer_WorkQueue2 
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException 
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置参数
        factory.setHost("43.139.51.247"); // ip 默认值:localhost
        factory.setPort(5672); // 端口 默认值:5672
        factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
        factory.setUsername("czk");
        factory.setPassword("czk");
        // 3.创建连接Connection
        Connection connection = factory.newConnection();
        // 4.创建Channel
        Channel channel = connection.createChannel();
        // 5.创建队列Queue
        /**
         * public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
         * durable:是否持久化
         * exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
         * autoDelete:是否自动删除,没有Consumer时自动删除
         * arguments:参数
         */
        channel.queueDeclare("work_queue",true,false,false,null);
        // 6.接收消息
        /**
         * public String basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:收到消息是否自动确认,消息丢失相关
         * callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel)
            /**
             * 回调方法
             * consumerTag:标识
             * envelope:获取一些消息,交换机、路由key
             * properties:配置消息
             * body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
                System.out.println("------------------------------------");
            
        ;
        channel.basicConsume("work_queue",true,consumer);
        // 监听程序,无需关闭资源
    

Pub/Sub订阅模式

生产者

<dependencies>
    <!--rabbitmq java客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
</dependencies>
/**
 * 发送消息
 */
public class MessageProducer_PubSub 
    public static void main(以上是关于SpringBoot整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ—SpringBoot中实现死信队列

SpringBoot+RabbitMQ 死信队列

SpringBoot+RabbitMQ 死信队列

springBoot集成rabbitmq 之延时(死信)队列

SpringBoot+RabbitMQ 死信队列

SpringBoot+RabbitMQ 死信队列