消息中间件RabbitMQ的使用
Posted cjh-notes
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件RabbitMQ的使用相关的知识,希望对你有一定的参考价值。
原理场景
MQ在所有项目里面都很常见,
1、减少非紧急性任务对整个业务流程造成的延时;
2、减少高并发对系统所造成的性能上的影响;
举例几个场景:
1、给注册完成的用户派发优惠券、加积分、发消息等(派发优惠券、加积分、发消息这些属于非紧急性任务,可交由MQ进行处理,先让用户完成注册)
2、实时收集用户运动数据,并且收集数据后还需要比较复杂和耗时的操作才能完成业务处理(实时的数据采集任务一般并发量都是很高的,我们就应该先发送到MQ,再进行有序的处理)
另外说明一下,高并发的问题有很多种处理手段,而MQ是我认为的最稳健、简单的手段之一,所以我会优先使用
大概流程
首先用docker安装RabbitMQ,快捷
进入控制台,下图简单介绍下各个功能模块
大致流程
1、发送消息到MQ
2、MQ接收并保存消息等待消费
3、消费者有序地进行消息处理
Springboot中的使用
springboot中使用MQ超级简单
1、配置
#RabbitMq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=test
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.prefetch=1 #这个根据自己业务情况来定
2、发送
@Autowired private AmqpTemplate amqpTemplate; @Test public void testMq() User user = new User(); user.setName("cjh陈"); amqpTemplate.convertAndSend("exchange_01","que_01", JSON.toJSONString(JSONResult.SUCC("",user)));
3、消费
@RabbitListener(containerFactory = "containerFactory",concurrency = "10",bindings = @QueueBinding(declare = "false",value = @Queue(value = "que_01"), exchange = @Exchange(declare = "false",value = "exchange_01",type = "fanout"))) public void process(Message message, com.rabbitmq.client.Channel channel) Long deliveryTag = null; String data = null; try deliveryTag = message.getMessageProperties().getDeliveryTag(); data = new String(message.getBody(), "UTF-8"); //你的业务处理 test01(data); catch (Exception e) logger.warn("MQ异常 , , " , e.getMessage(),e.getCause(),data); finally logger.warn("======消息处理结束"); try channel.basicAck(deliveryTag, false); // 确认消息成功消费(配置需要开启消费确认模式) catch (IOException e) logger.warn("======MQ应答出错,请检查");
3步完成使用
特别说明几点
1、AmqpTemplate 好像做不到发布确认,要用RabbitTemplate,发布确认我主要用在分布式事务的场景
2、containerFactory可以不配置,根据实际情况来,下面再说明
3、concurrency指并发量,实时性要求很高的话,prefetch应该设低点或者设置成1,concurrency的值调高点
简单画了一下,,,留着自己看,,
4、declare = "false",很常用,意思就是说加入你MQ控制台里面已经新建好队列或者交换机了,这里就应该配false表示程序不再进行重新定义,不然容易发生报错(当重新定义的参数与已定义的参数不一致时就会报错)
5、权限方面的处理以后再记录,,
回到上面第2点,containerFactory什么时候会用到?
某些配置需要自定义,比如线程池的大小,
当concurrency数值放大的时候,比如100,我发现大部分的消费者并没有工作,这是因为被线程池的大小所限制,网上的人说线程池大小默认是50,我也没去查估计差不多也就这个数,那么这个时候我们就需要自定义的containerFactory:
package xxx; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author cjh * @Package xxx * @Description: * @date: 2019/6/30 20:19 */ @Configuration @EnableRabbit public class MqContainerFactory implements RabbitListenerConfigurer /** * containerFactory * @Description: 自定义配置 * @param * @return */ @Bean public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) throws Exception SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //必须是concurrency的两倍以上 ExecutorService service=Executors.newFixedThreadPool(200); factory.setTaskExecutor(service); factory.setPrefetchCount(1); return factory; @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new MappingJackson2MessageConverter()); return factory; @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
记录完成
有错欢迎指正,转载请注明博客出处:http://www.cnblogs.com/cjh-notes/
以上是关于消息中间件RabbitMQ的使用的主要内容,如果未能解决你的问题,请参考以下文章