必学消息队列-RabbitMQ(下集)
Posted 摸鱼打酱油
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了必学消息队列-RabbitMQ(下集)相关的知识,希望对你有一定的参考价值。
个人简介
作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。
文章目录
什么是RabbitMQ
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
MQ的特点
- MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。
- MQ遵循了AMQP协议的具体实现和产品。
MQ的使用场景
各种MQ对比
在目前主流的消息队列中有(ActiveMQ,RocketMQ,RabbitMQ,kafka)
RabbitMQ在上面的各种消息队列中对于消息的保护是十分到位的(不会丢失消息),相对于kafka,虽然kafka性能十分强悍,在大数据中处理海量数据游刃有余,但是kafka容易丢失消息,而RabbitMQ虽然性能不及kafka,但是也不会很差,对于消息要求完整性很高的系统中用RabbitMQ十分好。
SpringBoot+RabbitMQ
导入启动器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.9.RELEASE</version>
</dependency>
application.yml
spring:
rabbitmq:
username: ems
password: 123456
virtual-host: /ems
host: localhost
自定义RabbitTemplate
SpringBoot默认使用CachingConnectionFactory连接工厂
@Configuration
public class rabbitTemplateConfig
//注入SpringBoot默认的CachingConnectonFactory
@Bean
public RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory)
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
/**
* 当mandatory标志位设置为true时
* 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
* 那么broker会调用basic.return方法将消息返还给生产者
* 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
*/
rabbitTemplate.setMandatory(true);
//使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
rabbitTemplate.setUsePublisherConnection(true);
return rabbitTemplate;
RabbitTemplate实现发送消息
最简单的使用HelloWorld
经过SpringBoot整合的RabbitMQ,发送消息只要一条语句
对比如下:
原生RabbitMQ:(11行)
public static void main(String[] args) throws IOException, TimeoutException
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("ems");
factory.setPassword("123456");
factory.setVirtualHost("/ems"); //虚拟主机
factory.setHost("127.0.0.1"); //rabbitMQ的主机名(ip)
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"第一个RabbitMQ程序!!!".getBytes());
channel.close();
connection.close();
SpringBoot整合RabbitMQ:(1行)
@SpringBootTest
@RunWith(SpringRunner.class)
public class provider
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send()
//一条代码即可发送消息
/**
* 参数1:交换机名称
* 参数2:路由键
* 参数3:消息内容(不需要转换成byte数组)
*/
rabbitTemplate.convertAndSend("","boot_hello","boot_helloWorld");
@Component //所有RabbitMQ的消费者都需要“”加上“”Spring的组件注解,RabbitMQ消费者监听方法不用运行都可以被自动生效。。。。
public class consumer
//RabbitMQ消费者监听方法
@RabbitListener(queuesToDeclare = @Queue(name = "boot_hello",durable = "true",exclusive = "false"
,autoDelete = "false"))
public void receive(String msg)
System.out.println(msg);
workqueue模式
@SpringBootTest
@RunWith(SpringRunner.class) //加载上下文
public class workqueueTest
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send()
// System.out.println(rabbitTemplate);
for (int i = 0; i < 10; i++)
rabbitTemplate.convertAndSend("","boot_work","workqueue===>"+i);
@Component
public class consumer1
@RabbitListener(queuesToDeclare = @Queue(name = "boot_work",durable = "true"))
public void receive1(String msg1)
System.out.println("consumer1===>"+msg1);
@Component
class consumer2
@RabbitListener(queuesToDeclare = @Queue(name = "boot_work",durable = "true"))
public void receive2(String msg2)
System.out.println("consumer2===>"+msg2);
fanout模式
@SpringBootTest
@RunWith(SpringRunner.class)
public class fanoutTest
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test()
rabbitTemplate.convertAndSend("boot_fanout","","hello");
@Component
public class consumer3
@RabbitListener(bindings =
@QueueBinding(value = @Queue,exchange = @Exchange(value = "boot_fanout",type = "fanout"),key = "")
)
public void receive(String msg)
System.out.println("consumer1===>"+msg);
@Component
class consumer4
@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(value = "boot_fanout",type = "fanout"),key = ""))
public void receive(String msg)
System.out.println("consumer2===>"+msg);
direct模式
@SpringBootTest
@RunWith(SpringRunner.class)
public class directTest
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test()
rabbitTemplate.convertAndSend("direct_boot","user.log","direct");
@Component
public class directConsumer1
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "direct_boot",type = "direct")
,value = @Queue,key = "user")
)
public void receive(String msg)
System.out.println("consumer1===>"+msg);
@Component
class directConsumer2
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "direct_boot",type = "direct")
,value = @Queue,key = "user.log"
))
public void receive(String msg)
System.out.println("consumer2==>"+msg);
topic模式
@SpringBootTest
@RunWith(SpringRunner.class)
public class topicTest
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test()
rabbitTemplate.convertAndSend("topic_boot","user.hello.log","hello");
@Component
public class topicConsumer1
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "topic_boot",type = "topic")
,value = @Queue,key = "user.#"
))
public void receive(String msg)
System.out.println("consumer1==>"+msg);
@Component
class topicConsumer2
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "topic_boot",type = "topic")
,value = @Queue,key = "user.*"
))
public void receive(String msg)
System.out.println("consumer2==>"+msg);
RabbitMQ高级特性
消息队列的过期时间ttl
如果我们设置了消息队列的过期时间,假设我们设置了5000ms,5000ms过去了,如果这个队列还有未被消费的消息,那么这些消息将会被自动丢弃(无法找回)。。。。
队列里的消息的过期时间(有点坑)
消费者的消息的过期时间
设置消息队列的argument为x-message-ttl 为xxx值,比如value=“5000”,就是5秒过去了,消息队列未被消费的消息将会直接丢弃
坑:@argument注解设置参数一定要指定类型为Number子类,比如java.lang.Integer,不然会报错
比如:arguments = @Argument(name = “x-message-ttl”,value = “5000”,type = “java.lang.Integer”)
spring:
rabbitmq:
username: ems
password: 123456
virtual-host: /ems
host: localhost
listener:
direct:
acknowledge-mode: manual #手动确认
simple:
acknowledge-mode: manual #手动确认
@Test
public void test1()
MessageProperties messageProperties = new MessageProperties();
String msg = "hello_ttl";
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend("ttl_queue","ttl_a",message);
/**
* ==小坑:
* 使用RabbitListener实现队列的过期时间ttl必须要指定argument的“type”为Number类的子类,比如java.lang.Integer
* =======切记,ttl和消息队列长度都要用Number的子类,使用默认的会报错======
* 因为argument默认是java.lang.String类型,必须修改。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
* 。。。
*/
//@Queue和@Exchange指定value就会使这个队列和交换机设置为不过期的,没有value就是暂时的
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "ttl_temp",durable = "true"
,arguments = @Argument(name = "x-message-ttl",value = "5000",type = "java.lang.Integer")//一定要指定类型
)
,exchange = @Exchange(value = "ttl_queue",type = "direct"),key = "ttl_a"
))
public void receive(String msg,Message message,Channel channel)
System.out.println("msg==="+msg);
System.out.println("message==="+message);
System.out.println("channel==="+channel);
// try
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); //手动确认
// catch (IOException e)
// e.printStackTrace();
//
指定消息的过期时间
生产者消息的过期时间
核心代码:messageProperties.setExpiration(“5000”);
@Test
public void test2()
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("5000"); //设置指定消息的过期时间
String str="ttl_test2";
Message message = new Message(str.getBytes(),messageProperties);
rabbitTemplate.convertAndSend("","ttl_declare",message);
@RabbitListener(queuesToDeclare = @Queue(name = "ttl_declare"))
public void receive1(String msg,Message message,Channel channel) throws IOException
System.out.println(msg);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
死信队列
消息放进死信队列的条件:
1:消息过期了,如果有死信队列则放入死信队列,如果没有死信队列则直接丢弃无法找回。
2:某个消息队列长度已经达到最大值,此时在把消息发送到这个队列中,如果有死信队列则放入死信队列,没有则丢弃
3:消息被拒绝(basic.reject / basic.nack)
**================创建死信队列步骤**
1:创建一个普通队列
@SpringBootTest
@RunWith(SpringRunner.class)
public class deadLetter
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test()
Message message = new Message("deadLetter".getBytes(),new MessageProperties());
rabbitTemplate.convertAndSend("","nomal_dead",message);
@Component
public class nomalQueue
/**
* 这里我们只演示一种消息放入死信队列的情况(当消息过期后)
* 在某个队列设置了x-dead-letter-exchange和x-dead-letter-routing-key后,如果出现丢弃消息就会
* 通过x-dead-letter-exchange和x-dead-letter-routing-key找到指定的队列,这个队列就会默认是死信队列
* 其实死信队列也是正常的队列。。。。配置全都一样
*/
@RabbitListener(queuesToDeclare = @Queue(value = "nomal_dead",arguments =
@Argument(name = "x-message-ttl",value = "5000",type = "java.lang.Integer"),
@Argument(name = "x-dead-letter-exchange",value = "deadletter_exchange1"),
@Argument(name = "x-dead-letter-routing-key",value = "deadletter_key1")
))
public void receive(String msg, Message message, Channel channel)
System.out.println("msg1="+msg);
@Component
public class deadLetterQueue
/**
* 这里的交换机和路由key都要和配置的死信交换机、死信路由key一样。
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("deadLetterQueue")
,exchange = @Exchange(value = "deadletter_exchange1",type = "direct")
,key = "deadletter_key1"
))
public void receive_deadLetter(String msg)
System.out.println(msg);
固定长度的消息队列
核心代码:arguments = @Argument(name = “x-max-length”,value = “6”,type = “java.lang.Integer”)
@Test
public void test()
Message message = new Message(("max").getBytes(),new MessageProperties());
rabbitTemplate.convertAndSend("","maxLength_queue",message);
@RabbitListener(queuesToDeclare = @Queue(value = "maxLength_queue",durable = "true"
,arguments = @Argument(name = "x-max-length",value = "6",type = "java.lang.Integer")
))
public void receive(String msg, Message message以上是关于必学消息队列-RabbitMQ(下集)的主要内容,如果未能解决你的问题,请参考以下文章