消息队列(RabbitMQ)

Posted xue_yun_xiang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列(RabbitMQ)相关的知识,希望对你有一定的参考价值。

一、概述

消息队列作用:解耦异步削峰

解耦

解耦,就是将连个应用不是互相之间强依赖关系,一个挂了,两外一个依然可以使用

削峰

削锋,就是可以将高并发的网络请求 缓存到消息队中,慢慢的执行消息对对应的业务

618 秒杀购买商品
生成订单
扣减库存
扣减余额
发货,生成物流
如果以上 步骤都在一个应用的请求中去执行,在每秒 50000个并发的情况下,是否回对应服务造成很大的压力

优化流程
1.先扣减库存
2将对应商品生成订单的消息,发送到消息队列
3.让应用慢的消费消息 完成一下业务

异步

用户向服务发起请求,我们不会直接告诉用户结果,而是异步处理业务逻辑,稍后通过 通知的形式告诉用户

二、消息队列框架

常用消息队列框架:ActiveMQ,RocketMQ,Kafka,RabbitMQ。

RabbitMQ :特点 简单易用,支持多种语言,保证消息不丢失 可以配合作为分布式事务
Kafka: 特点是吞吐量非常大,适合大数据项目使用 支持多语言
RocketMQ:阿里出品,效率也很好,主要在spingcloud alibaba套件中使用,可以配合作为分布式事务
ActiveMQ:只支持java语言 很少用

三、RabbitMq

介绍

publisher (生产者) :负责生产消息,并且将消息 放到交换机
Exchange - 交换机:交互机 主要用来接收生产者 发送的消息 并且将消息发送给 路由
Routes - 路由:将接收到的消息 根据路由规则 ,发送到不同的消息队列(真正存储数据的地方)
Queue - 队列:就是存放消息的位置(存储再磁盘中)
Consumer - 消费者:负责 将队列中的消息读取出来

安装

使用docker 安装

1、创建

[root@mastera java2102]# mkdir docker-compose-rabbitmq
[root@mastera java2102]# cd docker-compose-rabbitmq/
[root@mastera docker-compose-rabbitmq]#
[root@mastera docker-compose-rabbitmq]# vim docker-compose.yaml

version: “3.1”
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data:/var/lib/rabbitmq

2、启动

[root@mastera docker-compose-rabbitmq]# docker-compose up

3、访问

在edge或者 火狐 不要再Google 访问
15672 后台管理界面 端口 http://192.168.12.130:15672/#/
5672 是客户端通信端口

四、RabbitMQ的通讯方式

1、最简单模式:一个生产者、一个消费者、一个队列

2、多个消费者共享队列模式:一个生产者 、一个队列、多个消费者共享 一个队列

3、发布订阅模式:一个生产者 多个队列,多个消费者

保证一个消息 可以被多个消费者,同时消费

4、路由模式:一个生产者、一个交换机、多个队列、多个消费者

5、topic 模式

五、代码实现以上五种模式

1、创建一个java 工程 并引入依赖

<dependencies>
    <!--
        rabbitmq  相关依赖
    -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>


    <!-- java 单元测试-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

2、创建工具类

/**
 * 创建rabbitmq 客户端
 */
public class RabbitMQClientFactory 


    /**
     * 创建 rabbit 连接
     * @return
     */
    public static Connection getConnection()


        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 5672 客户端连接端口
        connectionFactory.setPort(5672);
        connectionFactory.setHost("192.168.12.130");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // VirtualHost 是一个路径 相当于mysql中的数据库  文件系统中的文件夹  决定着消息的存储位置
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        try 
            connection = connectionFactory.newConnection();
         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        

        return connection;
    



3、简单模式测试

import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQTest1 


    private Connection connection;


    @Before // 初始化获取连接
    public void  init()

        //
        connection = RabbitMQClientFactory.getConnection();
    



    // 在rabbitmq 中都要 先启动消费者  在启动生产者  否则有可能报错

    /**
     * 消费者  消费消息
     * @throws IOException
     * @throws TimeoutException
     */
    @Test
    public void customer() throws IOException, TimeoutException 

        // 创建一个channel
        Channel channel = connection.createChannel();


        // 声明当前channel 绑定哪一个队列
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外
        // (conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("hello-queue",true,false,false,null);

        // 当前消费者 每次消费一条数据
        channel.basicQos(1);


        // 创建消费者 在此接受消息
        Consumer consumer = new DefaultConsumer(channel)
            @Override // 在当前方法 方法总接受消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                //String consumerTag 消费者标记
//                    Envelope envelope  信封
                // byte[] body 消息中的内容
                System.out.println("消费者接受到的消息:"+new String(body,"utf-8"));

            
        ;

        // 向前channel 绑定一个消费者
        //参数1:queue - 指定消费哪个队列 队列名称
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
         // 消费者 接收到 消息之后  会自动告诉 队列,说我已经接受到消息,否则队列还会再次发送刚才发送的消息
        // 确保消费者  一定确认接受到消息

        //参数3:consumer - 指定消费回调
        channel.basicConsume("hello-queue", true,consumer);

        // 等待命令行输入  作用就是让当前程序卡住
        System.in.read();

        // 释放资源
        channel.close();
        connection.close();
    


    /**
     * 生产者 发送消息
     * @throws IOException
     * @throws TimeoutException
     */
    @Test
    public void publisherTest() throws IOException, TimeoutException 

        // 创建一个channel
        Channel channel = connection.createChannel();



        // 发送消息

        // 参数1:指定exchange,使用""。   使用默认交互机
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 取值两种  1.队列名称   2.路由规则

        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型
        channel.basicPublish("","hello-queue" , null,"hello world".getBytes());
        System.out.println("生产者发送消息");

        channel.close();
        connection.close();
    

注意
1、消费者消费时,必须有ack 机制

//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
// 消费者 接收到 消息之后 会自动告诉 队列,说我已经接受到消息,否则队列还会再次发送刚才发送的消息
// 确保消费者 一定确认接受到消息

2、启动顺序的问题

// 在rabbitmq 中都要 先启动消费者 在启动生产者 否则有可能报错

测试

4、多个消费者共享队列模式(Work)测试

只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing

消费者指定Qoa和手动ack
两个消费者消费同一个消息队列

 @Test
    public void customerTest1() throws IOException 

        //1. 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //2. 创建channel
        final Channel channel = connection.createChannel();


        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("HelloWorld",true,false,false,null);

        // 每次推送只接受1 条消息
        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 

                try 
                    Thread.sleep(10000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("message:"+new String(body,"utf-8"));

                // 手动ack
  // false 是否多条批次 
                channel.basicAck(envelope.getDeliveryTag(),false);

            
        ;

        // 手动ack
        channel.basicConsume("HelloWorld",false,defaultConsumer);

        System.out.println("开始消费");

        System.in.read();

    


    @Test
    public void customerTest2() throws IOException 

        //1. 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //2. 创建channel
        final Channel channel = connection.createChannel();


        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("HelloWorld",true,false,false,null);

        // 每次推送只接受1 条消息
        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 

                try 
                    Thread.sleep(10000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("customerTest2-message:"+new String(body,"utf-8"));

                // 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);

            
        ;

        // 手动ack
        channel.basicConsume("HelloWorld",false,defaultConsumer);

        System.out.println("开始消费");

        System.in.read();

    

5、发布订阅模式测试

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

public class PublishSubjectTest 



    @Test
    public void PublishTest() throws IOException, TimeoutException 


        // 获取connection
        Connection connection = RabbitMQUtils.getConnection();

        // 创建channel
        Channel channel = connection.createChannel();



        //3. 创建exchange - 绑定某一个队列
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        // 分散输出
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
        channel.queueBind("pubsub-queue1","pubsub-exchange","");
        channel.queueBind("pubsub-queue2","pubsub-exchange","");

        for (int i=0;i<10;i++)
            String message  = "hello" + i;
            channel.basicPublish("pubsub-exchange","",null,message.getBytes());

        

        System.out.println("发布成功");

        channel.close();
        connection.close();



    


    @Test
    public void subjectTest1() throws IOException 


            //1. 获取连接对象
            Connection connection = RabbitMQUtils.getConnection();

            //2. 创建channel
            final Channel channel = connection.createChannel();


            //参数1:queue - 指定队列的名称
            //参数2:durable - 当前队列是否需要持久化(true)
            //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
            //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
            //参数5:arguments - 指定当前队列的其他信息
            channel.queueDeclare("pubsub-queue1",true,false,false,null);

            // 每次推送只接受1 条消息
            channel.basicQos(1);

            DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 

                    try 
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    System.out.println("消费者1-message:"+new String(body,"utf-8"));

                    // 手动ack
                    channel.basicAck(envelope.getDeliveryTag(),false);

                
            ;

            // 手动ack
            channel.basicConsume("pubsub-queue1",false,defaultConsumer);

            System.out.println("开始消费");

            System.in.read();

    


    @Test
    public void subjectTest2() throws IOException 


        //1. 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //2. 创建channel
        final Channel channel = connection.createChannel();


        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare("pubsub-queue2",true,false,false,null);

        // 每次推送只接受1 条消息
        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 

                try 
                    Thread.sleep(10000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("消费者2-message:"+new String(body,"utf-8"));

                // 手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);

            
        ;

        // 手动ack
        channel.basicConsume("pubsub-queue2",false,defaultConsumer);

        System.out.println("开始消费");

        System.in.read();

    

消费者还是正常的监听某一个队列即可。

6、路由模式测试

生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。

//3. 创建exchange, routing-queue-error,routing-queue-info,
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue-error","routing-exchange","ERROR");
channel.queueBind("routing-queue-info","routing-exchange","INFO");

//4. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());

消费者没有变化

package com.wgz;

import com.rabbitmq.client.*;
import com.wgz.utils.RabbitMQUtils;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RoutingTest 





    @Test  //先启动消费者,在启动生产者
    public void PublishTest() throws IOException, TimeoutException 


        // 获取connection
        Connection connection = RabbitMQUtils.getConnection();

        // 创建channel
        Channel channel = connection.createChannel(消息队列(RabbitMQ)

【rabbitMQ】消息队列之 rabbitMQ

java如何获取rabbitmq队列中消息数量

求助,rabbitmq读取消息队列的问题

RabbitMQ 消息队列

六.RabbitMQ消息队列的基础+实战