RabbitMQ 小记

Posted lianzuo123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 小记相关的知识,希望对你有一定的参考价值。

RabbitMQ中的交换机类型

  • Direct
  • Fanout
  • Topic
  • headers

Direct Exchange(直连交换机)

直连交换机的特点是消息队列通过routingKey与交换机进行绑定,相同的routingKey会获得相同的消息。一个队列可以通过多个不同的routingKey与交换机进行绑定。不同的队列也可以通过相同的routingKey绑定交换机。

Fanout Exchange(扇出交换机)

扇出交换机的特点是类似于广播,只要队列与该类型的交换机绑定,所有发送到该交换机的信息都会被转发到所有与之绑定的队列,与routingKey无关。

Topic Exchange(主题交换机)

应用范围最广的交换机类型,消息队列通过消息主题与交换机绑定。一个队列可以通过多个主题与交换机绑定,多个消息队列也可以通过相同消息主题和交换机绑定。并且可以通过通配符(*或者#)进行多个消息主题的适配。
消息主题的一般格式为xxx.xxx.xxx(x为英文字母,每个单词用英文句号隔开)。*通配符可以适配一个单词,#可以适配零个或者多个单词。
通配符适配如下:
*.xxx.#。此主题可以适配xxx前面只有一个单词后面有零个或者多个单词的所有消息主题。

* 可以匹配一个单词
# 可以匹配多个

如  路由键 a.b.c.d   	
 a.*.c.*  可以匹配
 a.#       可以匹配

Header Exchenge(头交换机)

与routingKey无关,匹配机制是匹配消息头中的属性信息。在绑定消息队列与交换机之前声明一个map键值对,通过这个map对象实现消息队列和交换机的绑定。当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息

常用模式

  • 简单模式
    • 最简单的模式,就1个消费者负责处理信息

  • 工作模式
    • 最常用的模式,多个消费者,同时处理,可以更加合理的提升处理速度

  • 广播模式
    • 同一条消息,会被推送到多个与之绑定的队列,通常用于群发消息

  • 路由模式
    • 通常用于订阅数据,错误/异常通知,可以根据路由键来过滤,只获取自己需要的内容

  • 主题模式
    • 类似于路由模式,只是路由键支持通配符

各种模式,无非就是交换机类型与路由键,队列参数之前的各种设置,根据实际业务需求,来合理规划,可以更好的使用!

各种模式的配置规则

参考代码(使用原生API来演示,以更好理解底层运作):

依赖

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.4.3</version>
</dependency>

工具类

package com.example.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @version 0.0.1
 * @Author: LianZuoYang
 * @Create: 2022-08-27 10:28
 */
public class Utils 
    public static Channel getChannel() throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.64.140");//MQ的服务器地址
        factory.setPort(5672);//MQ的端口
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();//通信通道
        return channel;
    


简单模式

生产者

package com.example.rabbitmq.m1;

import com.example.rabbitmq.Utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 简单模式,1个收,1个发
 * @version 0.0.1
 * @Author: LianZuoYang
 * @Create: 2022-08-27 09:49
 */
public class Producer 
    public static void main(String[] args) throws IOException, TimeoutException 
        Scanner scanner = new Scanner(System.in);


        String queueName = "HelloWord";
        //连接RabbitMQ
        Channel channel = Utils.getChannel();//通信通道
        //创建队列(如已存在则不创建)

        //持久化:会将队列信息保存到硬盘,非持久化:只在内存里保存队列信息
        //独占:只能被一个消费者消费,非独占:可以被多个消费者共享
        //自动删除:没有消费者时,服务器自动删除队列,不自动删除:没有消费者时,服务器不自动删除队列
        //其他参数:一些队列的其他可选参数

        //参数列表            队列名       是否持久化     是否独占         是否自动删除       其他参数
        channel.queueDeclare(queueName, false, false, false, null);
        //向队列发送数据
        byte[] bytes = "test message".getBytes();


        //参数列表        交换机(空串是默认)   队列名   消息的其他参数    消息内容
        //channel.basicPublish("", queueName, null, bytes);

        while (true)
            String s = scanner.nextLine();
            channel.basicPublish("", queueName, null, s.getBytes());
        
    


消费者

package com.example.rabbitmq.m1;

import com.example.rabbitmq.Utils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 简单模式,1个收,1个发
 * @version 0.0.1
 * @Author: LianZuoYang
 * @Create: 2022-08-27 10:29
 */
public class Consumer 
    public static void main(String[] args) throws IOException, TimeoutException 
        String queueName = "HelloWord";
        //连接RabbitMQ
        Channel channel = Utils.getChannel();//通信通道

        //消费者也创建一下队列,可以让消费者跟生产者的启动顺序灵活变动,谁先启动,谁创建
        //创建队列(如已存在则不创建)
        channel.queueDeclare(queueName, false, false, false, null);

        //处理消息的回调对象
        DeliverCallback deliverCallback = new DeliverCallback() 
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException 
                byte[] body = message.getBody();
                String msg = new String(body);
                System.out.println("收到消息: " + msg);
            
        ;
        //消费者取消接收消息的回调对象(不再从队列接收消息的回调)
        CancelCallback cancelCallback = new CancelCallback() 
            @Override
            public void handle(String consumerTag) throws IOException 

            
        ;

        //从队列持续的接收消息,消息需要传递到一个回调对象进行处理
        //参数true为自动确认,服务端发送消息给消费者以后,就删除了服务端的消息.
        //推荐使用false手动确认,需要消费者处理完消息后,手动发送确认消息,让服务端删除消息,这样不会造成消息丢失
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);


    


工作者模式

生产者

package com.example.rabbitmq.m2;

import com.example.rabbitmq.Utils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 工作模式,多个收,1个发
 * @version 0.0.1
 * @Author: LianZuoYang
 * @Create: 2022-08-27 10:55
 */
public class Producer 
    public static void main(String[] args) throws IOException, TimeoutException 
        Scanner scanner = new Scanner(System.in);

        String queueName = "task_queue";
        //连接RabbitMQ
        Channel channel = Utils.getChannel();//通信通道
        //创建队列(如已存在则不创建)

        //持久化:会将队列信息保存到硬盘,非持久化:只在内存里保存队列信息
        //独占:只能被一个消费者消费,非独占:可以被多个消费者共享
        //自动删除:没有消费者时,服务器自动删除队列,不自动删除:没有消费者时,服务器不自动删除队列
        //其他参数:一些队列的其他可选参数

        //参数列表            队列名       是否持久化     是否独占         是否自动删除       其他参数
        channel.queueDeclare(queueName, true, false, false, null);
        //向队列发送数据
        byte[] bytes = "test message".getBytes();


        //参数列表        交换机(空串是默认)   队列名   消息的其他参数    消息内容
        //channel.basicPublish("", queueName, null, bytes);


        //持久化的消息
        AMQP.BasicProperties persistentBasic = MessageProperties.PERSISTENT_BASIC;
        while (true)
            String s = scanner.nextLine();
            channel.basicPublish("", queueName, persistentBasic, s.getBytes());
        
    


消费者

package com.example.rabbitmq.m2;

import com.example.rabbitmq.Utils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作模式,多个收,1个发
 *
 * @version 0.0.1
 * @Author: LianZuoYang
 * @Create: 2022-08-27 10:29
 */
public class Consumer 
    public static void main(String[] args) throws IOException, TimeoutException 
        //启动2个消费者.
        //此模式是轮询的,按照顺序发送,可能造成某个消费者繁忙的时候,还会分配消息给他
        //比如有消息1-10,消费者a,b
        //那么a会处理1,3,5,7,9
        //b会处理2,4,6,8,10
        for (int i = 0; i < 2; i++) 
            createConsumer();
        
    

    private static void createConsumer() throws IOException, TimeoutException 
        String queueName = "task_queue";
        Channel channel = Utils.getChannel();//通信通道
        channel.queueDeclare(queueName, false, false, false, null);
        //处理消息的回调对象
        DeliverCallback deliverCallback = new DeliverCallback() 
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException 
                byte[] body = message.getBody();
                String msg = new String(body);
                for (int j = 0; j < msg.length(); j++) 
                    if (msg.charAt(j) == '.') //碰到.就等待1秒
                        try 
                            Thread.sleep(1000);
                         catch (InterruptedException e) 
                            throw new RuntimeException(e);
                        
                    
                
                System.out.println(consumerTag + " >> 处理完毕:" + msg);
                //因为basicConsume里设置了autoAck为false,需要手动发送确认回执
                //获取消息的编号
                long deliveryTag = message.getEnvelope().getDeliveryTag();
                //处理完成,发送消息处理完毕的回执
                channel.basicAck(deliveryTag,false);
            
        ;
        //消费者取消接收消息的回调对象(不再从队列接收消息的回调)
        CancelCallback cancelCallback = new CancelCallback() 
            @Override
            public void handle(String consumerTag) throws IOException 

            
        ;

        //设置Qos为1,既一次只收1条消息,在消息处理完之前,不接收其他消息,保证消息的合理分配
        channel.basicQos(1);

        //从队列持续的接收消息,消息需要传递到一个回调对象进行处理
        //参数true为自动确认,服务端发送消息给消费者以后,就删除了服务端的消息.
        //推荐使用false手动确认,需要消费者处理完消息后,手动发送确认消息,让服务端删除消息,这样不会造成消息丢失
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    


广播/群发 模式

生产者

package com.example.rabbitmq.fanout;

import com.example.rabbitmq.Utils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 广播/群发模式,交换机类型使用Fanout
 * 生产者只需要往交换机内投递消息,每个绑定了此交换机的消费者都会收到消息
 * @version 0.0.1
 * @Author: LianZuoYang
 * @Create: 2022-08-27 10:55
 */
public class Producer 
    public static void main(String[] args) throws IOException, TimeoutException 
        Scanner scanner = new Scanner(System.in);
        //自定义一个交换机名称(需要跟消费者那边一致)
        String exchangeName = "logs";
        //连接RabbitMQ
        Channel channel = Utils.getChannel();//通信通道
        //创建交换机 交换机类型为Fanout,此交换机是广播模式的,即所有绑定了此交换机的队列都会收到消息
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);


        while (true) 
            System.out.print("输入消息:");
            String s = scanner.nextLine();
            //向交换机发送数据
            channel.basicPublish(exchangeName, "", null, s.getBytes());
        
    


消费者

package com.example.rabbitmq.fanout;

import com.example.rabbitmq.Utils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 广播/群发模式
 * 消费者需要自己创建队列,绑定到指定的Fanout类型交换机上,即可收到消息
 * @version 0.0.1
 * @Author: LianZuoYang
 * @Create: 2022-08-27 10:29
 */
public class Consumer 
    public static void main(String[] args) throws IOException, TimeoutException 
        //启动3个消费者.
        for (int i = 1; i < 4; i++) 
            createConsumer(i);
        
    

    private static void createConsumer(int index) throws IOException, TimeoutException 
        //自定义一个交换机名称(需要跟生产者那边一致)
        String exchangeName = "logs";

        String queueName = "queue_" + index; //在此模式下,队列名应当使用随机名称,此处为了方便观察,用index生成
        Channel channel = Utils.getChannel();//通信通道
        //创建交换机(可能消费者先启动了,那么就由消费者创建交换机)
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);

        //创建自己的队列
        //                                  非持久化,       独占,         自动删除
        channel.queueDeclare(queueName, false, true, true, null);

        //将队列绑定到绑定到logs交换机上
        //第三个参数对Fanout模式无效
        channel.queueBind(queueName, exchangeName, "");


        //处理消息的回调对象
        DeliverCallback deliverCallback = new DeliverCallback() 
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException 
                byte[] body = message.getBody();
                String msg = new String(body)RabbitMQ小记

RabbitMQ小记

RabbitMQ小记

RabbitMQ 小记

RabbitMQ小记

Java 小记 — RabbitMQ 的实践与思考