RabbitMQ工作模式
Posted 不断前进的皮卡丘
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ工作模式相关的知识,希望对你有一定的参考价值。
✨ RabbitMQ工作模式
📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件
Work queues工作队列模式
基本介绍
- 多个消费者共同消费同一个队列中的消息,可以实现快速消费,避免消息积压,但是多个消费者消费队列的消息的时候,是互斥的,同一个消息只能被一个消费者消费,不可以被多个消费者消费。
- 应用:对于一些任务比较多的情况,使用工作队列可以提高任务处理的速度
编写代码
抽取公共部分,写一个工具类
- 我们知道像连接等操作,其实基本上代码都是一样的,每一次写重复的代码其实没有什么意义,我们可以写一个工具类来封装这些操作。
public class ConnectionUtil
public static Connection getConnection() throws Exception
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.137.118");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
生产者
public class Producer
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++)
String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
channel.close();
connection.close();
消费者1
public class Consumer1
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body:"+new String(body));
;
channel.basicConsume(QUEUE_NAME,true,consumer);
消费者2
代码和上面相同
测试
我们先启动两个消费者,然后再启动生产者发送消息,到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
发布订阅模式
订阅模式类型
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特定队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- 发布订阅模式
- 每个消费者都监听自己的队列
- 生产者把消息发送给broker,然后交换机把消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
- 交换机只负责转发消息,并没有存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
代码编写
生产者
public class Producer
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
消费者1
public class Consumer1
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
;
channel.basicConsume(queue1Name,true,consumer);
消费者2
public class Consumer2
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
;
channel.basicConsume(queue2Name,true,consumer);
测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;达到广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
发布订阅模式和工作队列模式的区别
- 工作队列模式不需要定义交换机,而发布订阅模式需要定义交换机
- 发布订阅模式需要设置队列和交换机的绑定,而工作队列模式不需要设置,事实上是因为工作队列模式会把队列绑定到默认的交换机
- 发布订阅模式中,生产者向交换机发送消息;工作队列模式则是生产者向队列发送消息(底层使用默认交换机)
Routing 路由模式
基本介绍
- P:生产者,向交换机发送消息的时候,会指定一个routing key
- X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列
- C1:消费者,它所在队列指定了需要routing key为error的信息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
路由模式的特点
- 队列和交换机的绑定是需要指定routing key的,不可以随意绑定
- 消息的发送方向交换机发送消息的时候,也需要指定消息的routing key
- 交换机不再把消息交给每一个绑定的队列,而是根据消息的routing key来进行判断,只有队列的routing key和消息的routing key完全一样才会接收到消息。
编写代码
生产者
public class Producer
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");
// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
// 发送消息
channel.basicPublish(exchangeName,"warning",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
运行
消费者1
public class Consumer1
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
;
channel.basicConsume(queue1Name,true,consumer);
消费者2
public class Consumer2
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body:"+new String(body));
System.out.println("将日志信息存储到数据库.....");
;
channel.basicConsume(queue2Name,true,consumer);
Topics 通配符模式
基本介绍
- Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
- 通配符规则:
- #:匹配0个或者多个词
- *:刚好可以匹配一个词
代码
生产者
public class Producer
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
/**
* 参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
//需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//发送消息goods.info,goods.error
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
channel.close();
connection.close();
消费者1
public class Consumer1
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body:"+new String(body));
;
channel.basicConsume(queue1Name,true,consumer);
消费者2
public class Consumer2
public static 以上是关于RabbitMQ工作模式的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ入门教程:扇形交换机发布/订阅(Publish/Subscribe)
WebAPi的可视化输出模式(RabbitMQ消息补偿相关)——所有webapi似乎都缺失的一个功能
WebAPi的可视化输出模式(RabbitMQ消息补偿相关)所有webapi似乎都缺失的一个功能