rabbitmq 五种消息模型
Posted xie-qi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq 五种消息模型相关的知识,希望对你有一定的参考价值。
具体可参考:https://note.youdao.com/ynoteshare1/index.html?id=db637b43f0ab16cf6db9b9b92d562ca8&type=notebook#/7A55B7E7787A49D0B2E2265D437F3C19;这里写的很具体了;
一、基础环境:
1)创建springboot项目, 并导入如下依赖;
<!--rabbitmq依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency> <!--springboot mq支持--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2)写一个连接工具类;
/** * 建立连接的工具类,用来简单测试消息发送接收功能 * 实际上与springboot使用不需要该类 */ public class ConnectionUtil { /** * 建立连接 * @return * @throws IOException * @throws TimeoutException */ public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory(); //mq服务器IP factory.setHost("192.168.190.141"); //铜须端口号 factory.setPort(5672); //虚拟主机 factory.setVirtualHost("xieqi"); factory.setUsername("xieqi"); factory.setPassword("123456"); return factory.newConnection(); } }
二、消息模型;
1)、基本消息模型(basic queues)
producer--- |队列| ---consumer
功能:一个生产者P发送消息到队列Q,一个消费者C接收。实现了基本的消息的生产和消费。一对一。
生产者:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.建立连接 Connection connection = ConnectionUtil.getConnection(); //2.建立通道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare( "simple_queue",//队列名称 false,//设置是否持久化 false,//设置是否排他(仅申明他的连接可见) false,//是否自动删除 null);//参数设置 for (int i = 0; i <10 ; i++) { String message=" hello rabbit"+i; //通过channel发送消息 channel.basicPublish( "",//exchange 交换机 ""表示使用默认 "simple_queue",// routing_key 路由key null,//设置项 message.getBytes());//消息 System.out.println("消息发送成功:"+message); } //关闭通道和连接 channel.close(); connection.close(); } }
自动确认消费者:
/** * 描述: * 消费消息,自动确认(ACK) * @author bigpeng * @create 2019-07-15 13:40 */ public class ConsumerAutoACK { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); } }; // 监听队列,第二个参数:是否自动进行消息确认。 channel.basicConsume(QUEUE_NAME, true, consumer); } }
手动确认消费者:
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //1.获取连接 Connection connection = ConnectionUtil.getConnection(); //2.创建通道 Channel channel = connection.createChannel(); //3.声明队列 channel.queueDeclare("simple_queue", false,false,false,null); //4 定义队列的消费者 DefaultConsumer consumer=new DefaultConsumer(channel){ //处理消息,当监听到队列中有消息时,会触发该方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("获取到队列simple_queue的消息:" +message); Random random=new Random(); if(random.nextInt(10)%2==1) { //手动ACK //成功ACK //同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。 channel.basicAck(envelope.getDeliveryTag(), false); System.out.println("消费成功"); }else { //失败的ACK channel.basicNack(envelope.getDeliveryTag(), false, true);//是否重回队列 System.out.println("消费失败"); } } }; //将consumer关联到通道 //自动ACK // channel.basicConsume( // "simple_queue",//队列名 // true,//是否自动消息确认(ACK) // consumer);//Consumer对象 //手动ACK channel.basicConsume( "simple_queue",//队列名 false,//是否自动消息确认(ACK) consumer);//Consumer对象 } }
2)、工作队列(work queues)
功能:一个生产者,多个消费者。写法与基本消息模型类似,只不过原来是一个消费者,现在是多个消费者。多个消费者处理队列中的数据。
特点:
1)可以有多个消费者;
2)一条消息只能被多个消费者中的一个消费。
3)、发布/订阅模式 Publish/Subscribe
功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者
与工作队列区别:
1)工作队列只有一个队列,而发布订阅有多个队列
2)工作队列一个消息只能被多个消费者中的一个消费,而发布订阅一个消息会被多个订阅的消费者消费。
3)发布订阅比工作队列多出一个交换机概念,用来绑定消息发送到哪些消费者。 其实之前的两种模式也需要交换机,其使用默认交换,我们通过空字符串(“”)来识别。
4)、路由模式(Routing)
功能:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key。只有当两个key相匹配时,消息才会发送到对应的消费者队列。即在广播的基础上有了路由的功能。 type 指定为direct。
5)、主题订阅模式(topic)
功能:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配; 符号#:匹配一个或者多个词 lazy.# 可以匹配 lazy.irs或者lazy.irs.cor; 符号*:只能匹配一个词 lazy.* 可以匹配 lazy.irs或者lazy.cor
以上是关于rabbitmq 五种消息模型的主要内容,如果未能解决你的问题,请参考以下文章