RabbitMQ入门前篇

Posted 嘟嘟的程序员铲屎官

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门前篇相关的知识,希望对你有一定的参考价值。

本篇博文目录:

一.RabbitMQ

1.消息队列

Message Queue中文意思消息队列,是一种进程的通信机制,用于上下游传递消息。其中在二个进程之间MQ起到消息中间件的作用,实现在进程之间的解耦操作,原来是进程与进程之间直接通信,这样的话耦合性大,并且如果通信失败无法确定那一面出现问题,而通过在中间多一个信息传递就可以实现解耦,并且当有一端出现问题时,也能够知道是那一端出现了问题,保证了数据传输的可靠性。

2.RabbitMQ

RabbitMQ是众多消息代理服务器中使用的较广,较多的一款,RabbitMQ支持几乎所有的操作系统与编程语言,并且Rabbit提供高并发,高可用的成熟方案,支持多种消息协议,易于部署与使用。

下图列出了RabbitMQ与其他MQ的对比图(注意时效性,视频大概是2018出的)

RabbitMQ的应用场景如下:

3.安装RabbitMQ

  • 安装教程

Winddos环境下安装教程: https://www.cnblogs.com/chenwolong/p/rabbitmq.html
Linux环境下安装教程:https://blog.csdn.net/qq_45173404/article/details/116429302

  • Widnos的安装包下载(官网下载太慢)

erlang25.0.1版本:otp_win64_25.0.1.exe
https://www.aliyundrive.com/s/NJwrinH1VNy 提取码: m92c
rabbitmq3.11.7版本: rabbitmq-server-3.11.7.exe
https://www.aliyundrive.com/s/wkfkd7ewzNa 提取码: d43w

  • 安装完毕,启动RabbitMQ管理模块的插件后,访问http://localhost:15672/ 输入账号guest,密码guest,进行登入,会进入如下界面,说明安装成功:

备注:如果无法访问,你可以看看这篇博文:https://itguye.blog.csdn.net/article/details/128770009

4.RabbitMQ常用命令

可以通过下面的命令来操作RabbitMQ,当然也可以在网站上通过图形化的方式进行操作。

rabbitmq-server 前台启动服务
rabbitmq-server -detached 后台启动服务
rabbitmqctl stop 停止服务
rabbitmqctl start_app 启动应用
rabbitmqctl stop_app 终止应用
rabbitmqctl add_user username password – 创建新用户
rabbitmqctl delete_user username – 删除用户
rabbitmqctl change_password username newpassword– 重置密码
rabbitmqctl set_user_tags username tag – 授予用户角色(Tag)
rabbitmqctl set_permissions -p / user_admin'.*' '.*''.*'– 设置用户允许访问的vhost

上面rabbitmqctl set_user_tags username tag命令中的tag,如下:

二.使用RabbitMQ进行编程

1.AMQP

AMQP是一个协议规范,二个不同应用程序只要遵循该协议就可以实现通信,其中RabbitMQ就是AMQP的一种实现方式。

AMQP中的一些知识概念,如生产者,消费者,消息,队列和虚拟主机等,解释如下:

2.第一次MQ通信

  • 添加一个名为/test的虚拟主机

  • 创建一个maven项目,并导入依赖
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
    </dependencies>

备注:最新版本为5.16版本: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.16.0

  • 创建utiles工具包,并创建RabbitmqUtils和RabbitConstant类,如下:

RabbitmqUtils类,Rabbit工具类,该工具类就一个方法,就是获取RabbitMq的连接对象Connection :

package utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitmqUtils 
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
        // 静态代码块进行初始化
        static
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/test");
        


        // 获取mq连接对象
        public static Connection getConnection() 
            try 
                Connection connection = connectionFactory.newConnection();
                return connection;
             catch (Exception e) 
              throw  new RuntimeException();
            
        


RabbitConstant类,该类存放RabbitMq的配置常量信息:

package utils;

public class RabbitConstant 
    public static final String QUEUE_HELLOWORLD = "helloworld";
    public static final String QUEUE_SMS = "sms";
    public static final String EXCHANGE_WEATHER = "weather";
    public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
    public static final String QUEUE_BAIDU = "baidu";
    public static final String QUEUE_SINA = "sina";
    public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";



  • 创建一个helloworld的包,在该文件下创建生产者类和消费者类,来实现生产者发送一个helloworld的字符串,然后消费者接受该字符串的操作。

Producer类:首先通过Rabbitmq的工具类获取连接对象,然后通过连接对象创建通道(虚拟连接),接着通过虚拟连接创建一个名为helloworld的队列,并往队列中传递数据hellowrld:

package helloworld;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Producer 
    public static void main(String[] args) throws IOException, TimeoutException 
        // 获取连接对象
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道,虚拟连接
        Channel channel = connection.createChannel();
        // 声明一个队列
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
        String message = "hello world";
        channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("发送数据成功");
        // 关闭虚拟通道
        channel.close();
        // 关闭连接
        connection.close();
    



ConSummer类:通过工具类获取连接,然后创建通道,通过通道使用helloworld队列,并创建一个消费者对象,传入匿名内部类DefaultConsumer,并传入channel对象,获取helloworld消息队列的数据:

package helloworld;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

/**
 * 消费者
 */
public class ConSummer 
    public static void main(String[] args) throws IOException 
            // 获取mq连接
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道号
        Channel channel = connection.createChannel();
        // 获取队列
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
        // 接收
        channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false, new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("接收到的数据:"+new String(body));
                //签收消息,确认消息
                //envelope.getDeliveryTag() 获取这个消息的TagId
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            
        );
    


  • 运行效果

  • 消息的状态

三.RabbitMQ六中工作模式

1.RabbitMQ

RabbitMQ六种工作模式中,2~5使用较多,下面会给出实例代码,对于这几种方式存在一定的一致性,都是在方式1的基础上进行添加,功能更加丰富(上文中的案例代码就是Hello World方式)。

2.Work queues

本实例模拟短信通知服务,就是用户购买订单成功后,通过RabbitMQ发送短信给用户进行订单确认,实例代码,首先在helloworld项目中创建一个名为workqueue的包,并创建SMS,Producer,ConSummerOne,ConSummerTwo,ConSummerThree这五个类,详细代码如下:

  • SMS类:实体类
package workqueue;

public class SMS 
    private String name;
    private String mobile;
    private String content;

    public SMS(String name, String mobile, String content) 
        this.name = name;
        this.mobile = mobile;
        this.content = content;
    

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

    public String getMobile() 
        return mobile;
    

    public void setMobile(String mobile) 
        this.mobile = mobile;
    

    public String getContent() 
        return content;
    

    public void setContent(String content) 
        this.content = content;
    


  • Producer:生产者,用来模拟100个用户同时订票
package workqueue;


import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Producer 
    public static void main(String[] args) throws IOException, TimeoutException 
        // 获取连接对象
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道,虚拟连接
        Channel channel = connection.createChannel();
        // 声明一个队列
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        // 给100给订票的用户发送订票成功的短信信息
        for (int i = 1; i <= 100; i++) 
            SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
            // 将sms转换为jSON对象
            String message = new Gson().toJson(sms);
            channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, message.getBytes(StandardCharsets.UTF_8));
        
        System.out.println("发送成功");
        // 关闭虚拟通道
        channel.close();
        // 关闭连接
        connection.close();

    


  • ConSummerOne类:发送短信处理1
package workqueue;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

/**
 * 消费者
 */
public class ConSummerOne 
    public static void main(String[] args) throws IOException 
            // 获取mq连接
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道号
        Channel channel = connection.createChannel();
        // 获取队列
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个

        // 接收
        channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("消费者1,进行发送:"+new String(body));
                try 
                    Thread.sleep(100);// 延时0.1s
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                //签收消息,确认消息
                //envelope.getDeliveryTag() 获取这个消息的TagId
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            
        );
    


  • ConSummerTwo类:发送短信2
package workqueue;


import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;

import java.io.IOException;

/**
 * 消费者
 */
public class ConSummerTwo 
    public static void main(String[] args) throws IOException 
            // 获取mq连接
        Connection connection = RabbitmqUtils.getConnection();
        // 创建通道号
        Channel channel = connection.createChannel();
        // 获取队列
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
        // 接收
        channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("消费者2,进行发送:"+new String(body));
                try 
                    Thread.sleep(100);// 延时0.1s
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                //签收消息,确认消息
                //envelope.getDeliveryTag() 获取这个消息的TagId
                //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(),falseRabbitMQ高级进阶(2021.05.29)

java - RabbitMQ 消费者不接收消息

SpringBoot整合RabbitMQ(2021.05.29)

SpringBoot整合RabbitMQ(2021.05.29)

ctfshow web入门 反序列化 前篇 254-266

Vue笔记:基础入门(前篇)