笔记15:RabbitMQ工作模式案例及可靠性队列
Posted Lossdate
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了笔记15:RabbitMQ工作模式案例及可靠性队列相关的知识,希望对你有一定的参考价值。
一、写在开头
-
RabbitMQ交换器有:direct 、 topic 、 headers 和 fanout 四种类型
-
POM:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> </dependencies>
二、使用默认交换器
使用默认交换器,可以不绑定交换器
- Producer
public class Producer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.200.136"); factory.setVirtualHost("/"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123456"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare( //队列名 "queue.default.ex", //是否持久化 false, //是否只能自己的连接才能使用 false, //是否自动删除 false, //有没有属性 null); //在发送消息的时候没有指定交换器的名字,此时使用的是默认的交换器,默认交换器就没有名字 //路由键就是目的地消息队列的名字 channel.basicPublish( //exchange名字 "", //路由键 "queue.default.ex", null, ("hello default").getBytes(StandardCharsets.UTF_8)); channel.close(); connection.close();
- Consumer
public class Consumer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); GetResponse getResponse = channel.basicGet("queue.default.ex", true); System.out.println("收到的消息:" + new String(getResponse.getBody())); channel.close(); connection.close();
- 验证
启动producer,生产消息,之后启动consumer
三、Work Queue模式
生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果
交换器:direct
- Producer
public class Producer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("queue.wq", true, false, false, null); //声明DIRECT类型的交换器 channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null); channel.queueBind("queue.wq", "ex.wq", "key.wq"); for (int i = 0; i < 15; i++) channel.basicPublish("ex.wq", "key.wq", null, ("工作队列: " + i).getBytes("UTF-8")); channel.close(); connection.close();
- Consumer
public class Consumer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //确保MQ中有该队列,如果没有则创建 channel.queueDeclare("queue.wq", true, false, false, null); channel.basicConsume("queue.wq", new DeliverCallback() @Override public void handle(String consumerTag, Delivery delivery) throws IOException System.out.println("推送来的消息:" + new String(delivery.getBody(), "UTF-8")); , new CancelCallback() @Override public void handle(String consumerTag) throws IOException System.out.println("Cancel: " + consumerTag); );
- 测试
启动三个Consumer,接着启动Producer,可以看到三个Consumer平均的收到总共15个消息
四、发布订阅模式
使用fanout类型交换器,routingKey忽略
每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息
消息广播给所有订阅该消息的消费者
交换器:fanout
- Producer
public class Producer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //声明fanout类型的交换器 channel.exchangeDeclare("ex.fan", BuiltinExchangeType.FANOUT, true, false, null); for (int i = 0; i < 20; i++) channel.basicPublish("ex.fan", //fanout类型的交换器不需要指定路由键 "", null, ("hello fan: " + i).getBytes(StandardCharsets.UTF_8)); channel.close(); connection.close();
- Consumer
创建三个Consumer(OneConsumer, TwoConsumer, ThreeConsumer)
TwoConsumer, ThreeConsumer和OneConsumer内容一样,修改下System.out.println内容public class OneConsumer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //生成临时队列,队列名字由RabbitMQ自动生成 final String queueName = channel.queueDeclare().getQueue(); System.out.println("OneConsumer生成的临时队列名字:" + queueName); //声明一个fanout交换器 channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null); //绑定,fanout类型的交换器绑定不需要routingKey channel.queueBind(queueName, "ex.fan", ""); channel.basicConsume(queueName, (consumerTag, delivery) -> System.out.println("OneConsumer:" + new String(delivery.getBody(), StandardCharsets.UTF_8) ), (consumerTag) -> System.out.println("OneConsumer Cancel: " + consumerTag));
- 测试
分别启动OneConsumer、TwoConsumer、ThreeConsumer,然后启动Producer,
可以看到三个Consumer都收到了20个消息
五、路由模式
让接收者只接收部分消息
例:通过直接模式的交换器将关键的错误信息记录到不同的log文件,同时在控制台正常打印所有的日志信息,这里模拟ERROR,FALTAL和WARN
交换器:direct
- Producer
public class Producer private static final String[] LOG_LEVEL = "ERROR", "FATAL","WARN" ; private static Random RANDOM= new Random(); public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //direct交换器 channel.exchangeDeclare("ex.routing", "direct", false, false, null); for (int i = 0; i < 100; i++) String level = LOG_LEVEL[RANDOM.nextInt(100) % LOG_LEVEL.length]; channel.basicPublish("ex.routing", level, null, ("来自["+level+"]的消息").getBytes()); channel.close(); connection.close();
- Consumer
三个consumer(ErrorConsumer、FatalConsumer、WarnConsumer)
其它两个consumer分别修改对应的队列名(queue.fatal, queue.warn)和System.out.println的输出public class ErrorConsumer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare("ex.routing", "direct", false, false, null); //临时消息队列 channel.queueDeclare("queue.error", false, false, false, null); //绑定消息队列和交换器 channel.queueBind("queue.error", "ex.routing", "ERROR"); channel.basicConsume("queue.error", ((consumerTag, delivery) -> System.out.println("ErrorConsumer收到的消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8)) ),(consumerTag) -> );
- 测试
分启动三个Consumer,之后启动Procucer, 可以看到不同的消息分别路由到了特定的Consumer
六、主题模式
使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符
交换器将消息路由转发到具体队列时会根据消息routingKey 模糊匹配,比较灵活
要想 topic 类型的交换器, routingKey就不能随便写了,它必须得是点分单词
单词可以随便写,生产中一般使用消息的特征。如:“system.group.key”,该点分单词字符串最长255字节
bindingKey 也必须是这种形式
topic 类型的交换器背后原理跟 direct 类型的类似:只要队列 的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息
有两个不同:
" * "星号表示匹配一个单词
" # "井号表示匹配0到多个单词
如果在 topic 类型的交换器中 bindingKey 使用 # ,则就是 fanout 类型交换器的行为
如果在 topic类型的交换器中 bindingKey 中不使用 * 和 # ,则就是 direct 类型交换器的行为
交换器:topic
- Producer
routingKey: level.area.workpublic class Producer private static final String[] LOG_LEVEL = "INFO", "ERROR", "WARN"; private static final String[] LOG_AREA = "Shanghai", "Beijing", "HongKong"; private static final String[] LOG_WORK = "programmer", "engineer", "artist"; private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //direct交换器 channel.exchangeDeclare("ex.topic", "topic", true, false, null); String level, area, work; String routingKey, msg; for (int i = 0; i < 100; i++) level = LOG_LEVEL[RANDOM.nextInt(100) % LOG_LEVEL.length]; area = LOG_AREA[RANDOM.nextInt(100) % LOG_AREA.length]; work = LOG_WORK[RANDOM.nextInt(100) % LOG_WORK.length]; //用‘.’分隔维度 routingKey = level + "." + area + "." + work; msg = "LOG"+i+" ["+level+"] -> AREA:"+area+" -> WORK:"+work; channel.basicPublish("ex.topic", routingKey, null, msg.getBytes()); channel.close(); connection.close();
- Consumer
分别创建四个consumer
1)ShanghaiConsumer :只接收来自上海的消息
2)BeijingErrorConsumer :只收来自北京的ERROR的消息
3)HongKongProgrammerConsumer:只收来自香港的程序员的消息
4)ErrorConsumer:只收ERROR的消息public class ShanghaiConsumer public static void main(String[] args) throws Exception ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f"); final RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码