消息中间件-----RabbitMq集成springboot
Posted 阿里-马云的学习笔记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件-----RabbitMq集成springboot相关的知识,希望对你有一定的参考价值。
引入jar包
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.enjoyedu</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rq-springboot-with</name> <description>rq-springboot-with</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置文件
application.properties
spring.application.name=springboot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
不过这些配置文件优先放在apollo等,方便更改。
RabbitConfig
package cn.enjoyedu.config; import cn.enjoyedu.RmConst; import cn.enjoyedu.hello.UserReceiver; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; @Autowired private UserReceiver userReceiver; /** * 连接工厂 * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses+":"+port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); /** 如果要进行消息回调,则这里必须要设置为true */ connectionFactory.setPublisherConfirms(publisherConfirms); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } /** * 自定义rabbitTemplate * @return */ @Bean public RabbitTemplate newRabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); //设置mandatory参数,发送消息后,成功绑定到队列返回成功给生产者 template.setMandatory(true); //生产者消息确认 template.setConfirmCallback(confirmCallback()); //rabbitmq回调(例如设置mandatory参数,回调给生产者) template.setReturnCallback(returnCallback()); return template; } //===============使用了RabbitMQ系统缺省的交换器========== //绑定键即为队列名称 @Bean public Queue helloQueue() { return new Queue(RmConst.QUEUE_HELLO); } @Bean public Queue userQueue() { return new Queue(RmConst.QUEUE_USER); } //===============以下是验证topic Exchange========== @Bean public Queue queueEmailMessage() { return new Queue(RmConst.QUEUE_TOPIC_EMAIL); } @Bean public Queue queueUserMessages() { return new Queue(RmConst.QUEUE_TOPIC_USER); } /** * 声明topic模式的exchange * @return */ @Bean public TopicExchange exchange() { return new TopicExchange(RmConst.EXCHANGE_TOPIC); } /** * 配置exchange、queue以及路由键之间的关系,简单说就是将队列以某个路由键绑定到exchange上 * @return */ @Bean public Binding bindingEmailExchangeMessage() { return BindingBuilder .bind(queueEmailMessage()) .to(exchange()) .with("sb.*.email"); } @Bean public Binding bindingUserExchangeMessages() { return BindingBuilder .bind(queueUserMessages()) .to(exchange()) .with("sb.*.user"); } //===============以上是验证topic Exchange========== //===============以下是验证Fanout Exchange========== @Bean public Queue AMessage() { return new Queue("sb.fanout.A"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(RmConst.EXCHANGE_FANOUT); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } //===============以上是验证Fanout Exchange的交换器========== //===============消费者确认========== @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(userQueue()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(userReceiver); return container; } //===============生产者发送确认========== @Bean public RabbitTemplate.ConfirmCallback confirmCallback(){ return new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("发送者确认发送给mq成功"); } else { //处理失败的消息 System.out.println("发送者发送给mq失败,考虑重发:"+cause); } } }; } @Bean public RabbitTemplate.ReturnCallback returnCallback(){ return new RabbitTemplate.ReturnCallback(){ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("无法路由的消息,需要考虑另外处理。"); System.out.println("Returned replyText:"+replyText); System.out.println("Returned exchange:"+exchange); System.out.println("Returned routingKey:"+routingKey); String msgJson = new String(message.getBody()); System.out.println("Returned Message:"+msgJson); } }; } }
生产者
TopicSender
package cn.enjoyedu.topic; import cn.enjoyedu.RmConst; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TopicSender { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String msg1 = "I am email mesaage msg======"; System.out.println("TopicSender send the 1st : " + msg1); this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.RK_EMAIL, msg1); String msg2 = "I am user mesaages msg########"; System.out.println("TopicSender send the 2nd : " + msg2); this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.RK_USER, msg2); String msg3 = "I am error mesaages msg"; System.out.println("TopicSender send the 3rd : " + msg3); this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, "errorkey", msg3); } }
消费者
TopicEmailMessageReceiver
package cn.enjoyedu.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "sb.info.email") public class TopicEmailMessageReceiver { @RabbitHandler public void process(String msg) { System.out.println("TopicEmailMessageReceiver : " +msg); } }
以上是关于消息中间件-----RabbitMq集成springboot的主要内容,如果未能解决你的问题,请参考以下文章