RabbitMq+Spring boot 消息生产者向队列发送消息
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq+Spring boot 消息生产者向队列发送消息 相关的知识,希望对你有一定的参考价值。
本人学习新框架方法、
一、先学习框架基本知识,也就是看这本书的前三章,了解基本概念。比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机。
二、然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些。
三、然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果。
四、实际项目积累经验。
RabbitMq 消息生产者向队列发送消息 (一)
MQ分为消息生产者和消息消费者,这次做的主要是消息的生产者的讲述。就是发送消息到相应的队列上。
本文是用spring boot 来做的。
步骤1:
生成RabbitMqConfig配制文件,里面配制了队列,连接工厂,交换机等一些信息。
package com.basic.rabbitmq.productor.config; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; import javax.swing.event.ChangeEvent; /** * Rabbitmq配置类 * * 配制一些用户名和密码,还有就是配制队列,交换机,还有路由健 * Created by sdc on 2017/7/4. */ @Configuration @ComponentScan(basePackages = {"com.basic.rabbitmq.productor"}) @PropertySource(value = {"classpath:application.properties"}) public class RabbitMqConfig { @Autowired private Environment env; /** * 构建connectionfactory * @return * @throws Exception */ @Bean public ConnectionFactory connectionFactory() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(env.getProperty("spring.rabbitmq.host")); connectionFactory.setPort(Integer.valueOf(env.getProperty("spring.rabbitmq.port").trim())); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username")); connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password")); // connectionFactory.setPublisherReturns(true); return connectionFactory; } /** * CachingConnectionFactory * @return * @throws Exception */ @Bean public CachingConnectionFactory cachingConnectionFactory() throws Exception { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory()); /** 如果要进行消息回调,则这里必须要设置为true */ cachingConnectionFactory.setPublisherConfirms(true); // 必须要设置 return cachingConnectionFactory; } /** * RabbitTemplate,类似于jdbctemplate一样的工具类 * @return * @throws Exception */ @Bean public RabbitTemplate rabbitTemplate() throws Exception { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory()); // rabbitTemplate.setChannelTransacted(true); //这个设置参数 // rabbitTemplate.setMandatory(true); // rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()); // rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback()); return rabbitTemplate; } @Bean public AmqpAdmin amqpAdmin() throws Exception { return new RabbitAdmin(cachingConnectionFactory()); } /** * 通道处理 * @return * @throws Exception */ @Bean public Channel channel() throws Exception { //队列名字 String name = env.getProperty("emial.server.queue").trim(); // 是否持久化 boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.queue.durable").trim())? Boolean.valueOf(env.getProperty("emial.server.queue.durable").trim()) : true; // 仅创建者可以使用的私有队列,断开后自动删除 boolean exclusive = StringUtils.isNotBlank(env.getProperty("emial.server.queue.exclusive").trim())? Boolean.valueOf(env.getProperty("emial.server.queue.exclusive").trim()) : false; // 当所有消费客户端连接断开后,是否自动删除队列 boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.queue.autoDelete").trim())? Boolean.valueOf(env.getProperty("emial.server.queue.autoDelete").trim()) : false; ConnectionFactory connectionFactory = connectionFactory(); Connection connection = connectionFactory.newConnection(); Channel channes = connection.createChannel(); channes.queueDeclare(name, durable, exclusive, autoDelete, null); return channes; } /** * 队列 * @return */ @Bean public Queue queue() throws Exception{ //队列名字 String name = env.getProperty("emial.server.queue").trim(); // 是否持久化 boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.queue.durable").trim())? Boolean.valueOf(env.getProperty("emial.server.queue.durable").trim()) : true; // 仅创建者可以使用的私有队列,断开后自动删除 boolean exclusive = StringUtils.isNotBlank(env.getProperty("emial.server.queue.exclusive").trim())? Boolean.valueOf(env.getProperty("emial.server.queue.exclusive").trim()) : false; // 当所有消费客户端连接断开后,是否自动删除队列 boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.queue.autoDelete").trim())? Boolean.valueOf(env.getProperty("emial.server.queue.autoDelete").trim()) : false; return new Queue(name, durable, exclusive, autoDelete); } /** * 配制交换机,交换机类型为topic * @return */ @Bean public TopicExchange exchange () { //交换机的名字 String name = env.getProperty("emial.server.exchange"); // 是否持久化 boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.exchange.durable").trim())? Boolean.valueOf(env.getProperty("emial.server.exchange.durable").trim()) : true; // 当所有消费客户端连接断开后,是否自动删除队列 boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.exchange.autoDelete").trim())? Boolean.valueOf(env.getProperty("emial.server.exchange.autoDelete").trim()) : false; //创建交换机 return new TopicExchange(name, durable, autoDelete); } /** * 绑定,交换机要绑定要队列上,交换机才能把消息放入到相应的队列上。 * @return */ @Bean public Binding binding() throws Exception{ String routekey = env.getProperty("emial.server.routekey").trim(); return BindingBuilder.bind(queue()).to(exchange()).with(routekey); } }
步骤二配置文件:
application.properties
#指定具体使用哪种配置环境,此处指定使用application-prod.properties配置文件中的环境
spring.profiles.active=dev
application-dev.properties
#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
emial.server.exchange=email_exchange
emial.server.exchange.durable=true
emial.server.exchange.autoDelete=false
emial.server.queue=email_queue
emial.server.queue.durable=true
emial.server.queue.exclusive=false
emial.server.queue.autoDelete=false
emial.server.routekey=email_route_key
emial.server.exchange.bindingkey=email_route_key
具体接口EmailService服务:
package com.basic.rabbitmq.productor.service; /** * 邮件服务 * Created by sdc on 2017/7/5. */ public interface EmailService { /** * 发送邮件任务存入消息队列 * @param message * @throws Exception */ public void sendEmailForQueue(String message) throws Exception; }
package com.basic.rabbitmq.productor.service.impl; import com.basic.rabbitmq.productor.model.SendMessage; import com.basic.rabbitmq.productor.service.EmailService; import com.basic.rabbitmq.productor.util.MessageSender; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * 邮件服务 * Created by sdc on 2017/7/5. */ @Service("emailService") public class EmailServiceImpl implements EmailService { private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class); // @Resource( name = "rabbitTemplate" ) // private RabbitTemplate rabbitTemplate; @Value("${emial.server.exchange}") private String exchange; @Value("${emial.server.routekey}") private String routeKey; @Resource(name = "messageSender") private MessageSender messageSender; @Override public void sendEmailForQueue(String message) throws Exception { try { // rabbitTemplate.convertAndSend(exchange, routeKey, message); messageSender.handlerMessage(message); }catch (Exception e){ // logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e)); e.printStackTrace(); } } }
Controller层,写一个接口,往Rabbitmq上发送消息。
package com.basic.rabbitmq.productor.controller; import com.basic.rabbitmq.productor.constant.WebStatusEnum; import com.basic.rabbitmq.productor.model.ResponseVo; import com.basic.rabbitmq.productor.service.EmailService; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.ModelAndView; import javax.annotation.Resource; /** * 接口测试类 * * Created by sdc on 2017/7/5. */ @RestController @RequestMapping(value = "/email") public class EmailController extends BaseController{ @Resource(name = "emailService") private EmailService emailService; /** * 发供邮件服务 * @return * @throws Exception */ @RequestMapping(value="/sendEmail", method = RequestMethod.GET) public ResponseVo<?> sendEmail() throws Exception { String emailMessage = "邮件消息"; emailService.sendEmailForQueue(emailMessage); return generateResponseVo(WebStatusEnum.SUCCESS, ""); } }
EmailController的父类,抽出来,用于别的controller类集成。
package com.basic.rabbitmq.productor.controller; import com.basic.rabbitmq.productor.constant.WebStatusEnum; import com.basic.rabbitmq.productor.model.ResponseVo; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; /** * 提取出来一个类,用来作为controller层的父类,统一的返回格式。 * * Created by sdc on 2017/7/6. */ public class BaseController { /** * 生成统一的返回响应对象 * * @param webStatusEnum 状态码枚举 * @param data 数据对象 * @param <T> 数据对象类型参数 * @return */ public <T> ResponseVo generateResponseVo(WebStatusEnum webStatusEnum, T data) { return new ResponseVo(webStatusEnum.getCode(), webStatusEnum.getDesc(), data); } /** * 获取当前会话 * * @param request 请求 * @return httpSession */ public HttpSession getCurrentSession(HttpServletRequest request) { return request.getSession(); } }
常量类,用于返回给页面展示的公共类:
package com.basic.rabbitmq.productor.model; import java.io.Serializable; /** * 定义统一的返回格式,用于和前端一起沟通 * * Created by sdc on 2017/7/7. */ public class ResponseVo<T> implements Serializable { private static final long serialVersionUID = 1L; /** * 状态码 */ private String code; /** * 状态码对应的信息 */ private String message; /** * 数据对象 */ private T data; public ResponseVo(String code, String message, T data) { super(); this.code = code; this.message = message; this.data = data; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public T getData() { return data; } public void setData(T data) { this.data = data; } }
package com.basic.rabbitmq.productor.constant; /** * Created by sdc on 2017/7/7. */ public enum WebStatusEnum { /** * 定义接口返回状态码 * * 通用部分范围 5000 + * 业务使用范围 2000 至 4000 */ SUCCESS("5000", "成功"), FAILED("7000", "失败"), PARAM_ERROR("7001", "参数错误"), PARAM_NOT_NULL("7002", "参数不能为空"); /** * 系统码 */ private String code; /** * 描述 */ private String desc; WebStatusEnum(String code, String desc) { this.code = code; this.desc = desc; } public static WebStatusEnum getWebStatusEnumByKey(String key){ for(WebStatusEnum bt : values()){ if(bt.getCode().equals(key) ) return bt; } return null; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } }
最后pom.xml文件,用于下载jar包的。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>email-server</artifactId> <groupId>com.basic.email</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>email-server-productor</artifactId> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <fastjson.version>1.2.6</fastjson.version> <commons-lang3.version>3.4</commons-lang3.version> <guava.version>18.0</guava.version> <javax.mail.version>1.4.7</javax.mail.version> <log4j.version>1.2.17</log4j.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> </dependency> <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>${javax.mail.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.4</version> </dependency> <!-- log --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.0.1</version> </dependency> </dependencies> <build> <finalName>basic-model</finalName> <plugins> <!-- 编译插件 --> <!--编译插件设置--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <skip>true</skip> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
项目启动成功后,输入http://localhost:8080/email/sendEmail,
这个接口,每当成功一次就会在rabbitmq的管理页面上,看见队列里多一条信息。
最后就是成功了,用到的一些概念请自己查找,记忆会深刻些。
本文出自 “10093778” 博客,请务必保留此出处http://10103778.blog.51cto.com/10093778/1945756
以上是关于RabbitMq+Spring boot 消息生产者向队列发送消息 的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot (十三): Spring Boot 整合 RabbitMQ