SpringBoot整合RabbitMQ
Posted 海之浪子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ相关的知识,希望对你有一定的参考价值。
SpringBoot整合RabbitMQ
公司最近在开发CRM系统的时候,需要将ERP的订单数据实时的传输到CRM系统中,但是由于每天的订单量特别大,采用实时获取后并存储到数据库中,接口的相应速度较慢,性能较差。经过经过多方位评估采用在数据库与接口层添加RabbitMQ作为缓冲层来实现。
具体为:
1、ESB将订单数据实时推送至CRM Controller接口
2、CRM Controller调用RabbitMQ的生产者将数据推送至RabbitMQ服务器
3、消费者实时消费RabbitMQ的订单并进行处理并存储至数据库中这里只是实现了RabbitMQ的生产者和消费者,采用的RabbitMQ Direct模式,代码如下:
@SpringBootApplication @EnableScheduling public class RabbitMQApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQApplication.class, args); } }
@Configuration public class ProducerConfig { private static final Logger logger = LoggerFactory.getLogger(ProducerConfig.class); @Autowired private CachingConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { connectionFactory.setPublisherConfirms(true); //Use full publisher confirms, with correlation data and a callback for each message. connectionFactory.setPublisherReturns(true); connectionFactory.setConnectionTimeout(6000); //超时时间 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //采用默认模式创建模板 rabbitTemplate.setMandatory(true); //强制 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //TODO 数据没有投递成功,可以在这里进行一些操作,比如讲消息保存到数据库中,等待下次进行投递 logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; } }
@Component public class ProducerProcessing { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void directSend() { rabbitTemplate.convertAndSend(ReciverProcessing.DIRECT_EXCHANGE, ReciverProcessing.DIRECT_ROUTINGKEY, "订单信息"); System.out.println("消息投递完成"); } // @Scheduled(fixedDelay = 1000, initialDelay = 500) public void fanoutSend() { rabbitTemplate.convertAndSend(ReciverProcessing.FANOUT_EXCHANGE, "", "订单信息"); System.out.println("消息投递完成"); } }
@Configuration public class ReciverProcessing { //Direct模式 public static final String DIRECT_EXCHANGE = "direct_exchange"; public static final String DIRECT_ROUTINGKEY = "direct_rountingkey"; public static final String DIRECT_QUEUE = "direct_queue"; @Autowired private ConnectionFactory connectionFactory; //发布订阅模式 public static final String FANOUT_EXCHANGE = "fanout_exchange"; public static final String FANOUT_QUEUE1 = "fanout_queue1"; public static final String FANOUT_QUEUE2 = "fanout_queue2"; /** * 实例化 DirectExchange * * @return */ @Bean public DirectExchange directExchange() { /** * Exchange名称 * Exchange持久化 * Exchange 是否自动删除 */ return new DirectExchange(DIRECT_EXCHANGE, true, false); } /** * 创建广播模式交换机 * * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE, true, false); } @Bean public Queue directQueue() { /** * 队列名称 * 队列是否持久化 */ return new Queue(DIRECT_QUEUE, true); //队列持久 } @Bean public Binding directBinding() { /** * 交换器与队列绑定 */ return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY); } @Bean public SimpleMessageListenerContainer DirectMessageContainer() { //创建队列监听容器 SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); //监听的队列,可多个 simpleMessageListenerContainer.setQueues(directQueue()); simpleMessageListenerContainer.setExposeListenerChannel(true); //最大消费者数量 simpleMessageListenerContainer.setMaxConcurrentConsumers(10); //设置并发数 simpleMessageListenerContainer.setConcurrentConsumers(3); //一次拉取消息的数量 simpleMessageListenerContainer.setPrefetchCount(1); //确认模式 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String body = new String(message.getBody()); //TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功 System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); } }); return simpleMessageListenerContainer; } /** * 队列1 * * @return */ @Bean public Queue fanoutQueue1() { return new Queue(FANOUT_QUEUE1, true); } /** * 队列2 * * @return */ @Bean public Queue fanoutQueue2() { return new Queue(FANOUT_QUEUE2, true); } /** * 队列1绑定 * * @return */ @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } /** * 队列2绑定 * * @return */ @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } /** * 监听队列进行消费,这里相当于一个消费者监听了两个队列,在实际开发应用中,发布订阅模式监听两个队列,收到的消息是一样的,所以这样做是没有什么意义的 * 可以设置两个消费者,每个消费者只监听其中的一个队列,收到消息后根据需要来进行不同的处理 * * @return */ @Bean public SimpleMessageListenerContainer fanoutMessageContainer() { //创建队列监听容器 SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); //监听的队列,可多个 simpleMessageListenerContainer.setQueues(fanoutQueue1(), fanoutQueue2()); simpleMessageListenerContainer.setExposeListenerChannel(true); //最大消费者数量 simpleMessageListenerContainer.setMaxConcurrentConsumers(10); //设置并发数 simpleMessageListenerContainer.setConcurrentConsumers(3); //一次拉取消息的数量 simpleMessageListenerContainer.setPrefetchCount(1); //确认模式 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String body = new String(message.getBody()); //TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功 System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); } }); return simpleMessageListenerContainer; } }
以上是关于SpringBoot整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例