RabbitMQ-精简版

Posted 小新兜兜有糖

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-精简版相关的知识,希望对你有一定的参考价值。

RabbitMQ-精简版

声明: 本博客已标明出处,如有侵权请告知,马上删除。

RabbitMQ 相关概述:

RabbitMQ是微服务间实现异步通讯的一种技术.

RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

入门案例

生产者->发消息

public class PublisherTest 
    @Test
    public void testSendMessage() throws IOException, TimeoutException 
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.136.134");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("jd");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    

消费者->收消息

public class ConsumerTest 

    public static void main(String[] args) throws IOException, TimeoutException 
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.136.134");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("jd");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException 
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            
        );
        System.out.println("等待接收消息。。。。");
    

SpringAMQP-简单消息

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

SpringAMQP的简单消息没有交换机角色.

准备工作

1.导入坐标

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置地址

spring:
  rabbitmq:
    host: 192.168.136.134 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: jd # 用户名
    password: 123321 # 密码
SimpleQueue

简单消息: 一个队列对应一个消费者

生产者-发消息
@SpringBootTest
public class SpringAmqpTest 

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() 
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    

消费者-收消息
@Component
public class SpringRabbitListener 
	
    //queuesToDeclare: 队列名称,如果队列不存在,则会自动创建
    @RabbitListener(queuesToDeclare = @Queue("simple.queue"))
    public void listenSimpleQueueMessage(String msg) throws InterruptedException 
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    

WorkQueue

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

生产者-发消息
@Test
public void testWorkQueue() throws InterruptedException 
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) 
        // 发送消息---> 唯一ID
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    

消费者-收消息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException 
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);


@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException 
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);

相关补充

多个消费者绑定到一个队列,消息默认会被平均分配到每个消费者上, 同一条消息只会被一个消费者处理
在消费者端配置spring.rabbitmq.listener.simple.prefetch=1,控制消费者预取的消息数量(也就是消费完毕后再去取的数量)

SpringAMQP-发布/订阅

在订阅模型中,多了一个exchange角色:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,决定如何处理消息。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

注意:

  • Exchange只负责转发消息,不具备存储消息的能力
  • 如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
声明队列和交换机
基于@Bean配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 声明"队列"和"交换机"
 * 交换机的类型可以切换: FanoutExchange,DirectExchange,TopicExchange.
 */
@Configuration
public class FanoutConfig 
    
    /**
     * 声明交换机-jd.fanout
     * 交换机的类型: FanoutExchange
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() 
        return new FanoutExchange("jd.fanout");
    

    /**
     * 声明队列-fanout.queue1
     * @return
     */
    @Bean
    public Queue fanoutQueue1() 
        return new Queue("fanout.queue1");
    

    /**
     * 绑定队列1到交换机
     * @param fanoutQueue1 队列1,参数名字必须和fanoutQueue1的方法名保持一致(原因在于bean的名字)
     * @param fanoutExchange 交换机,参数名必须和fanoutExchange方法名保持一致
     * @return
     */
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) 
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    

    /**
     * 声明队列-fanout.queue2
     * @return
     */
    @Bean
    public Queue fanoutQueue2() 
        return new Queue("fanout.queue2");
    

    /**
     * 绑定队列2到交换机
     * @param fanoutQueue2 队列2,参数名字必须和fanoutQueue2的方法名保持一致(原因在于bean的名字)
     * @param fanoutExchange 交换机,参数名必须和fanoutExchange方法名保持一致
     * @return
     */
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) 
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    

基于注解配置
// 在消息的消费方上,直接绑定
// 通过@RabbitListener注解, 直接定义"交换机"和"队列", 并绑定他们的关系
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "队列名"),
    exchange = @Exchange(name = "交换机名", type = ExchangeTypes.交换机类型)
))
FanoutExchangeQueue

概述:

Fanout:广播,将消息交给所有绑定到交换机的队列.

假设:

已通过"@Bean配置"完成了"队列"和"交换机"的配置.

交换机名:  jd.fanout

队列名: fanout.queue1,  fanout.queue2		
生产者-发消息
@Test
public void testSendFanoutExchange() 
    // 交换机名称
    String exchangeName = "jd.fanout";
    // 消息
    String message = "hello, every one!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);

消费者-收消息
/**
 * FanoutQueue: 可以理解为是广播模式
 * 所有和Fanout交换机绑定的消息队列,都会接受到交换机中的所有消息
 */
@Component
public class FanoutQueueListener 

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) 
        System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) 
        System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    


DirectExchangeQueue

概述:

要根据Routing Key进行判断,队列的Routingkey与消息的 Routing key完全一致时,才会接收到消息

假设:

没有配置"交换机"和"队列"信息	
生产者-发消息
@Test
public void testSendDirectExchange() 
    // 交换机名称
    String exchangeName = "jd.direct";
    // 消息
    String message = "hello, red!";
    // 发送消息(第二个参数就是RoutingKey)
    rabbitTemplate.convertAndSend(exchangeName, "red", message);

消费者-收消息
/**
 * 根据key获取要消费的消息
 */
@Component
public class DirectQueueListener 

    //定义"交换机-jd.direct"
    //定义"队列-direct.queue1"
    //绑定"交换机"和"队列"
    //绑定key. 
    //要求: 消息的RoutingKey是red或blue,才执行
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "jd.direct", type = ExchangeTypes.DIRECT),
            key = "red", "blue"
    ))
    public void listenDirectQueue1(String msg)
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    

    //要求: 消息的RoutingKey是red或yellow,才执行
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "jd.direct", type = ExchangeTypes.DIRECT),
            key = "red", "yellow"
    ))
    public void listenDirectQueue2(String msg)
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    

TopicExchangeQueue

概述:

Routing key 可以使用通配符的DirectExchangeQuery.

通配符:

#:匹配一个或多个词

*:匹配不多不少恰好1个词
生产者-发消息
@Test
public void testSendTopicExchange() 
    // 交换机名称
    String exchangeName = "jd.topic";
    // 消息
    String message = "今天天气不错,我的心情好极了!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);

消费者-收消息
/**
 * 支持通配符的FanoutQueue
 */
@Component
public class TopicQueueListener 

    //定义"交换机-jd.topic"
    //定义"队列-topic.queue1"
    //绑定"交换机"和"队列"
    //绑定key. 
    //要求: 消息的RoutingKey是以china.开头的,才执行
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "jd.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg)
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    
    
    

    //要求: 消息的RoutingKey是以.news开头的,才执行
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "jd.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg)
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    

消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

默认情况下Spring采用的序列化方式是JDK序列化. 我们可以修改其默认转换方式

添加依赖
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
配置转换器
@Bean
public MessageConverter jsonMessageConverter()
    return new Jackson2JsonMessageConverter();

RabbitMQ 安装——RPM 和 TAR 两种方式

RPM 方式安装

准备条件:
  为安装准备一些基本工具,有些 linux 服务器可能是精简安装。
  yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
  我的机器在虚拟机中安装时选了一些组件(中文安装提示,其实我也无法确定是不是已经包含了所需的东西),rpm 过程中没有提示缺少包,因此我略过了这一步
第一步:下载 erlang、socat、rabbitmq
  wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
  wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
  wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
第二步:安装
  rabbitmq 最后一个安装
  rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
  rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
  rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
第三步:启用 web 管控台插件
  rabbitmq-plugins enable rabbitmq_management
第四步:调整 guest 账户登录限制
  修改:vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
  修改 loopback_users 中的 [<<"guest">>], 只保留 []
  其他修改如
    修改:heartbeat 为 5,单位为秒
第五步:启动/停止 rabbitmq
  rabbitmq-server 命令:/etc/init.d/rabbitmq-server start stop status restart
  使用 rabbitmq-server start & 以后台方式启动 rabbitmq
  停止 rabbitmq 服务:rabbitmqctl stop_app
    通常会失败... 用 ps -ef|grep rabbit 找出进程号 kill -9 强杀
    当出现 node rabbit is running 警告时也用 kill -9 解决
其他配置/操作:
  添加自定义配置文件:/etc/rabbitmq/rabbitmq.config
RPM安装方式的默认日志路径
  /var/log/rabbitmq
  目录下有两个文件
    rabbit@node1.log
    rabbit@node1-sasl.log
  在 broker 启动时有输出提示


访问 web 管控台
  个人学习时建议直接关闭防火墙,没必要单独开放端口
  systemctl stop firewalld.service
  只是暂时关闭防火墙,系统重启后需要再手动关闭
访问URL:
  http://192.168.33.51:15672
    ip 替换为自己的
  修改了 guest 账户的 IP 访问限制后,后续可以通过管控台添加用户,配置(administrator 或 management)权限即可

 

 

tar 方式安装(同《RabbitMQ实战指南》安装版本)

分别到官网下载 erlang、rabbitmq 安装包
  erlang 安装包:

    otp_src_19.3.tar.gz
  rabbitmq 安装包:
    rabbitmq-server-generic-unix-3.6.10.tar.xz
第一步:安装 erlang
  tar zxvf otp_src_19.3.tar.gz
  cd otp_src_19.3
  ./configure --prefix=/opt/erlang
    如果这一步发生 error,且提示为“No curses library functions found”,则需要安装 ncurses:
    yum install ncurses-devel
      安装完后重新 ./configure
    其他缺失的工具:
    yum install openssl openssl-devel unixODBC unixODBC-devel gcc gcc-c++
    gcc 编译器可以代替 javac 编译器,./configure 时 disable 信息中会有提示需要 javac 编译器,可以用 gcc 代替
    PS:前面 RPM 安装方式中有一个更全的 yum install 列表,其中也包含了上述工具,但更全,不放心的话可以直接使用 RPM 安装中提供的 yum install 列表
    yum 安装缺失的工具后要重新 ./configure --prefix=/opt/erlang
  make
  make install
  不出意外的话,安装成功
第二步:手动添加 erlang 环境变量
  vi /etc/profile
  添加以下变量:
    export ERLANG_HOME=/opt/erlang
    export PATH=$PATH:$ERLANG_HOME/bin
  source /etc/profile
检验 erlang 是否安装成功:
  输入 erl 命令,看到以下输出的话表示安装成功:
  Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
第三步:安装 RabbitMQ
  安装包 rabbitmq-server-generic-unix-3.6.10.tar.xz 为 xz,需要先用 xz 工具解压,获得 .tar 压缩文件
  xz -d rabbitmq-server-generic-unix-3.6.10.tar.xz
  tar -xvf rabbitmq-server-generic-unix-3.6.10.tar -C /opt
  cd /opt
  mv rabbitmq_server-3.6.10 rabbitmq
  至此解压完成
第四步:手动添加 RabbitMQ 环境变量
  vi /etc/profile
  添加以下变量:
    export RABBITMQ_HOME=/opt/rabbitmq
    export PATH=$PATH:$RABBITMQ_HOME/sbin
  source /etc/profile
检验 rabbitmq 是否安装成功:
  以后守护进程方式启动 rabbitmq
    rabbitmq-server -detached
  rabbitmqctl status
    查看 rabbitmq 节点状态,有正常信息输出表示安装、启动成功
取消 guest 用户 localhost 登录限制
  vi /opt/rabbitmq/ebin/rabbit.app
    输入查找命令:
      ?loopback
    将 [<<"guest">>] 改为 []
日志目录
########## Logs: /opt/rabbitmq/var/log/rabbitmq/rabbit@node-0.log
###### ## /opt/rabbitmq/var/log/rabbitmq/rabbit@node-0-sasl.log

以上是关于RabbitMQ-精简版的主要内容,如果未能解决你的问题,请参考以下文章

精简总结redis/rabbitmq/zookeeper在linux centos7上的安装

rabbitmq 消费 json 消息并转换成 Java 对象

RabbitMQ 安装——RPM 和 TAR 两种方式

NodeJS 构建 Docker精简版镜像

win7精简版缺少speechsdk

什么是win7精简版?