RabbitMQ 小记
Posted lianzuo123
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 小记相关的知识,希望对你有一定的参考价值。
RabbitMQ中的交换机类型
- Direct
- Fanout
- Topic
- headers
Direct Exchange(直连交换机)
直连交换机的特点是消息队列通过routingKey与交换机进行绑定,相同的routingKey会获得相同的消息。一个队列可以通过多个不同的routingKey与交换机进行绑定。不同的队列也可以通过相同的routingKey绑定交换机。
Fanout Exchange(扇出交换机)
扇出交换机的特点是类似于广播,只要队列与该类型的交换机绑定,所有发送到该交换机的信息都会被转发到所有与之绑定的队列,与routingKey无关。
Topic Exchange(主题交换机)
应用范围最广的交换机类型,消息队列通过消息主题与交换机绑定。一个队列可以通过多个主题与交换机绑定,多个消息队列也可以通过相同消息主题和交换机绑定。并且可以通过通配符(*或者#)进行多个消息主题的适配。
消息主题的一般格式为xxx.xxx.xxx(x为英文字母,每个单词用英文句号隔开)。*通配符可以适配一个单词,#可以适配零个或者多个单词。
通配符适配如下:
*.xxx.#。此主题可以适配xxx前面只有一个单词后面有零个或者多个单词的所有消息主题。
* 可以匹配一个单词
# 可以匹配多个
如 路由键 a.b.c.d
a.*.c.* 可以匹配
a.# 可以匹配
Header Exchenge(头交换机)
与routingKey无关,匹配机制是匹配消息头中的属性信息。在绑定消息队列与交换机之前声明一个map键值对,通过这个map对象实现消息队列和交换机的绑定。当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
常用模式
- 简单模式
-
最简单的模式,就1个消费者负责处理信息
- 工作模式
-
最常用的模式,多个消费者,同时处理,可以更加合理的提升处理速度
- 广播模式
-
同一条消息,会被推送到多个与之绑定的队列,通常用于群发消息
- 路由模式
-
通常用于订阅数据,错误/异常通知,可以根据路由键来过滤,只获取自己需要的内容
- 主题模式
-
类似于路由模式,只是路由键支持通配符
各种模式,无非就是交换机类型与路由键,队列参数之前的各种设置,根据实际业务需求,来合理规划,可以更好的使用!
各种模式的配置规则
参考代码(使用原生API来演示,以更好理解底层运作):
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
工具类
package com.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @version 0.0.1
* @Author: LianZuoYang
* @Create: 2022-08-27 10:28
*/
public class Utils
public static Channel getChannel() throws IOException, TimeoutException
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.64.140");//MQ的服务器地址
factory.setPort(5672);//MQ的端口
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();//通信通道
return channel;
简单模式
生产者
package com.example.rabbitmq.m1;
import com.example.rabbitmq.Utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 简单模式,1个收,1个发
* @version 0.0.1
* @Author: LianZuoYang
* @Create: 2022-08-27 09:49
*/
public class Producer
public static void main(String[] args) throws IOException, TimeoutException
Scanner scanner = new Scanner(System.in);
String queueName = "HelloWord";
//连接RabbitMQ
Channel channel = Utils.getChannel();//通信通道
//创建队列(如已存在则不创建)
//持久化:会将队列信息保存到硬盘,非持久化:只在内存里保存队列信息
//独占:只能被一个消费者消费,非独占:可以被多个消费者共享
//自动删除:没有消费者时,服务器自动删除队列,不自动删除:没有消费者时,服务器不自动删除队列
//其他参数:一些队列的其他可选参数
//参数列表 队列名 是否持久化 是否独占 是否自动删除 其他参数
channel.queueDeclare(queueName, false, false, false, null);
//向队列发送数据
byte[] bytes = "test message".getBytes();
//参数列表 交换机(空串是默认) 队列名 消息的其他参数 消息内容
//channel.basicPublish("", queueName, null, bytes);
while (true)
String s = scanner.nextLine();
channel.basicPublish("", queueName, null, s.getBytes());
消费者
package com.example.rabbitmq.m1;
import com.example.rabbitmq.Utils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 简单模式,1个收,1个发
* @version 0.0.1
* @Author: LianZuoYang
* @Create: 2022-08-27 10:29
*/
public class Consumer
public static void main(String[] args) throws IOException, TimeoutException
String queueName = "HelloWord";
//连接RabbitMQ
Channel channel = Utils.getChannel();//通信通道
//消费者也创建一下队列,可以让消费者跟生产者的启动顺序灵活变动,谁先启动,谁创建
//创建队列(如已存在则不创建)
channel.queueDeclare(queueName, false, false, false, null);
//处理消息的回调对象
DeliverCallback deliverCallback = new DeliverCallback()
@Override
public void handle(String consumerTag, Delivery message) throws IOException
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("收到消息: " + msg);
;
//消费者取消接收消息的回调对象(不再从队列接收消息的回调)
CancelCallback cancelCallback = new CancelCallback()
@Override
public void handle(String consumerTag) throws IOException
;
//从队列持续的接收消息,消息需要传递到一个回调对象进行处理
//参数true为自动确认,服务端发送消息给消费者以后,就删除了服务端的消息.
//推荐使用false手动确认,需要消费者处理完消息后,手动发送确认消息,让服务端删除消息,这样不会造成消息丢失
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
工作者模式
生产者
package com.example.rabbitmq.m2;
import com.example.rabbitmq.Utils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 工作模式,多个收,1个发
* @version 0.0.1
* @Author: LianZuoYang
* @Create: 2022-08-27 10:55
*/
public class Producer
public static void main(String[] args) throws IOException, TimeoutException
Scanner scanner = new Scanner(System.in);
String queueName = "task_queue";
//连接RabbitMQ
Channel channel = Utils.getChannel();//通信通道
//创建队列(如已存在则不创建)
//持久化:会将队列信息保存到硬盘,非持久化:只在内存里保存队列信息
//独占:只能被一个消费者消费,非独占:可以被多个消费者共享
//自动删除:没有消费者时,服务器自动删除队列,不自动删除:没有消费者时,服务器不自动删除队列
//其他参数:一些队列的其他可选参数
//参数列表 队列名 是否持久化 是否独占 是否自动删除 其他参数
channel.queueDeclare(queueName, true, false, false, null);
//向队列发送数据
byte[] bytes = "test message".getBytes();
//参数列表 交换机(空串是默认) 队列名 消息的其他参数 消息内容
//channel.basicPublish("", queueName, null, bytes);
//持久化的消息
AMQP.BasicProperties persistentBasic = MessageProperties.PERSISTENT_BASIC;
while (true)
String s = scanner.nextLine();
channel.basicPublish("", queueName, persistentBasic, s.getBytes());
消费者
package com.example.rabbitmq.m2;
import com.example.rabbitmq.Utils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作模式,多个收,1个发
*
* @version 0.0.1
* @Author: LianZuoYang
* @Create: 2022-08-27 10:29
*/
public class Consumer
public static void main(String[] args) throws IOException, TimeoutException
//启动2个消费者.
//此模式是轮询的,按照顺序发送,可能造成某个消费者繁忙的时候,还会分配消息给他
//比如有消息1-10,消费者a,b
//那么a会处理1,3,5,7,9
//b会处理2,4,6,8,10
for (int i = 0; i < 2; i++)
createConsumer();
private static void createConsumer() throws IOException, TimeoutException
String queueName = "task_queue";
Channel channel = Utils.getChannel();//通信通道
channel.queueDeclare(queueName, false, false, false, null);
//处理消息的回调对象
DeliverCallback deliverCallback = new DeliverCallback()
@Override
public void handle(String consumerTag, Delivery message) throws IOException
byte[] body = message.getBody();
String msg = new String(body);
for (int j = 0; j < msg.length(); j++)
if (msg.charAt(j) == '.') //碰到.就等待1秒
try
Thread.sleep(1000);
catch (InterruptedException e)
throw new RuntimeException(e);
System.out.println(consumerTag + " >> 处理完毕:" + msg);
//因为basicConsume里设置了autoAck为false,需要手动发送确认回执
//获取消息的编号
long deliveryTag = message.getEnvelope().getDeliveryTag();
//处理完成,发送消息处理完毕的回执
channel.basicAck(deliveryTag,false);
;
//消费者取消接收消息的回调对象(不再从队列接收消息的回调)
CancelCallback cancelCallback = new CancelCallback()
@Override
public void handle(String consumerTag) throws IOException
;
//设置Qos为1,既一次只收1条消息,在消息处理完之前,不接收其他消息,保证消息的合理分配
channel.basicQos(1);
//从队列持续的接收消息,消息需要传递到一个回调对象进行处理
//参数true为自动确认,服务端发送消息给消费者以后,就删除了服务端的消息.
//推荐使用false手动确认,需要消费者处理完消息后,手动发送确认消息,让服务端删除消息,这样不会造成消息丢失
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
广播/群发 模式
生产者
package com.example.rabbitmq.fanout;
import com.example.rabbitmq.Utils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 广播/群发模式,交换机类型使用Fanout
* 生产者只需要往交换机内投递消息,每个绑定了此交换机的消费者都会收到消息
* @version 0.0.1
* @Author: LianZuoYang
* @Create: 2022-08-27 10:55
*/
public class Producer
public static void main(String[] args) throws IOException, TimeoutException
Scanner scanner = new Scanner(System.in);
//自定义一个交换机名称(需要跟消费者那边一致)
String exchangeName = "logs";
//连接RabbitMQ
Channel channel = Utils.getChannel();//通信通道
//创建交换机 交换机类型为Fanout,此交换机是广播模式的,即所有绑定了此交换机的队列都会收到消息
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
while (true)
System.out.print("输入消息:");
String s = scanner.nextLine();
//向交换机发送数据
channel.basicPublish(exchangeName, "", null, s.getBytes());
消费者
package com.example.rabbitmq.fanout;
import com.example.rabbitmq.Utils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 广播/群发模式
* 消费者需要自己创建队列,绑定到指定的Fanout类型交换机上,即可收到消息
* @version 0.0.1
* @Author: LianZuoYang
* @Create: 2022-08-27 10:29
*/
public class Consumer
public static void main(String[] args) throws IOException, TimeoutException
//启动3个消费者.
for (int i = 1; i < 4; i++)
createConsumer(i);
private static void createConsumer(int index) throws IOException, TimeoutException
//自定义一个交换机名称(需要跟生产者那边一致)
String exchangeName = "logs";
String queueName = "queue_" + index; //在此模式下,队列名应当使用随机名称,此处为了方便观察,用index生成
Channel channel = Utils.getChannel();//通信通道
//创建交换机(可能消费者先启动了,那么就由消费者创建交换机)
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT);
//创建自己的队列
// 非持久化, 独占, 自动删除
channel.queueDeclare(queueName, false, true, true, null);
//将队列绑定到绑定到logs交换机上
//第三个参数对Fanout模式无效
channel.queueBind(queueName, exchangeName, "");
//处理消息的回调对象
DeliverCallback deliverCallback = new DeliverCallback()
@Override
public void handle(String consumerTag, Delivery message) throws IOException
byte[] body = message.getBody();
String msg = new String(body)RabbitMQ小记