RabbitMq+Spring boot 消息生产者向队列发送消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq+Spring boot 消息生产者向队列发送消息 相关的知识,希望对你有一定的参考价值。


本人学习新框架方法、

一、先学习框架基本知识,也就是看这本书的前三章,了解基本概念。比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机。

二、然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些。

三、然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果。

四、实际项目积累经验。


RabbitMq 消息生产者向队列发送消息 (一)

MQ分为消息生产者和消息消费者,这次做的主要是消息的生产者的讲述。就是发送消息到相应的队列上


本文是用spring boot 来做的。


步骤1:

生成RabbitMqConfig配制文件,里面配制了队列,连接工厂,交换机等一些信息。

package com.basic.rabbitmq.productor.config;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;

import javax.swing.event.ChangeEvent;

/**
 * Rabbitmq配置类
 *
 * 配制一些用户名和密码,还有就是配制队列,交换机,还有路由健
 * Created by sdc on 2017/7/4.
 */
@Configuration
@ComponentScan(basePackages = {"com.basic.rabbitmq.productor"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMqConfig {

    @Autowired
    private Environment env;

    /**
     * 构建connectionfactory
     * @return
     * @throws Exception
     */
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
        connectionFactory.setPort(Integer.valueOf(env.getProperty("spring.rabbitmq.port").trim()));
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
        connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));

//        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    /**
     * CachingConnectionFactory
     * @return
     * @throws Exception
     */
    @Bean
    public CachingConnectionFactory cachingConnectionFactory() throws Exception {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory());
        /** 如果要进行消息回调,则这里必须要设置为true */
        cachingConnectionFactory.setPublisherConfirms(true); // 必须要设置

        return cachingConnectionFactory;
    }

    /**
     * RabbitTemplate,类似于jdbctemplate一样的工具类
     * @return
     * @throws Exception
     */
    @Bean
    public RabbitTemplate rabbitTemplate() throws  Exception {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
//        rabbitTemplate.setChannelTransacted(true);
        //这个设置参数
//        rabbitTemplate.setMandatory(true);

//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback());
//        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback());
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() throws  Exception {
        return new RabbitAdmin(cachingConnectionFactory());
    }

    /**
     * 通道处理
     * @return
     * @throws Exception
     */
    @Bean
    public Channel channel() throws Exception {
        //队列名字
        String name = env.getProperty("emial.server.queue").trim();
        // 是否持久化
        boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.queue.durable").trim())?
                Boolean.valueOf(env.getProperty("emial.server.queue.durable").trim()) : true;

        // 仅创建者可以使用的私有队列,断开后自动删除
        boolean exclusive = StringUtils.isNotBlank(env.getProperty("emial.server.queue.exclusive").trim())?
                Boolean.valueOf(env.getProperty("emial.server.queue.exclusive").trim()) : false;

        // 当所有消费客户端连接断开后,是否自动删除队列
        boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.queue.autoDelete").trim())?
                Boolean.valueOf(env.getProperty("emial.server.queue.autoDelete").trim()) : false;

        ConnectionFactory connectionFactory = connectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channes = connection.createChannel();
        channes.queueDeclare(name, durable, exclusive, autoDelete, null);
        return channes;
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue queue() throws  Exception{
        //队列名字
        String name = env.getProperty("emial.server.queue").trim();
        // 是否持久化
        boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.queue.durable").trim())?
                Boolean.valueOf(env.getProperty("emial.server.queue.durable").trim()) : true;

        // 仅创建者可以使用的私有队列,断开后自动删除
        boolean exclusive = StringUtils.isNotBlank(env.getProperty("emial.server.queue.exclusive").trim())?
                Boolean.valueOf(env.getProperty("emial.server.queue.exclusive").trim()) : false;

        // 当所有消费客户端连接断开后,是否自动删除队列
        boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.queue.autoDelete").trim())?
                Boolean.valueOf(env.getProperty("emial.server.queue.autoDelete").trim()) : false;

        return new Queue(name, durable, exclusive, autoDelete);
    }

    /**
     * 配制交换机,交换机类型为topic
     * @return
     */
    @Bean
    public TopicExchange exchange () {
        //交换机的名字
        String name = env.getProperty("emial.server.exchange");
        // 是否持久化
        boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.exchange.durable").trim())?
                Boolean.valueOf(env.getProperty("emial.server.exchange.durable").trim()) : true;
        // 当所有消费客户端连接断开后,是否自动删除队列
        boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.exchange.autoDelete").trim())?
                Boolean.valueOf(env.getProperty("emial.server.exchange.autoDelete").trim()) : false;
        //创建交换机
        return new TopicExchange(name, durable, autoDelete);
    }

    /**
     * 绑定,交换机要绑定要队列上,交换机才能把消息放入到相应的队列上。
     * @return
     */
    @Bean
    public Binding binding() throws  Exception{
        String routekey = env.getProperty("emial.server.routekey").trim();
        return BindingBuilder.bind(queue()).to(exchange()).with(routekey);
    }

}

步骤二配置文件:

application.properties


#指定具体使用哪种配置环境,此处指定使用application-prod.properties配置文件中的环境
spring.profiles.active=dev


application-dev.properties

#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

emial.server.exchange=email_exchange
emial.server.exchange.durable=true
emial.server.exchange.autoDelete=false

emial.server.queue=email_queue
emial.server.queue.durable=true
emial.server.queue.exclusive=false
emial.server.queue.autoDelete=false

emial.server.routekey=email_route_key
emial.server.exchange.bindingkey=email_route_key


具体接口EmailService服务:

package com.basic.rabbitmq.productor.service;

/**
 * 邮件服务
 * Created by sdc on 2017/7/5.
 */
public interface EmailService {

    /**
     * 发送邮件任务存入消息队列
     * @param message
     * @throws Exception
     */
    public void sendEmailForQueue(String message) throws Exception;

}
package com.basic.rabbitmq.productor.service.impl;

import com.basic.rabbitmq.productor.model.SendMessage;
import com.basic.rabbitmq.productor.service.EmailService;
import com.basic.rabbitmq.productor.util.MessageSender;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * 邮件服务
 * Created by sdc on 2017/7/5.
 */
@Service("emailService")
public class EmailServiceImpl implements EmailService {

    private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class);

//    @Resource( name = "rabbitTemplate" )
//    private RabbitTemplate rabbitTemplate;

    @Value("${emial.server.exchange}")
    private String exchange;

    @Value("${emial.server.routekey}")
    private String routeKey;

    @Resource(name = "messageSender")
    private MessageSender messageSender;

    @Override
    public void sendEmailForQueue(String message) throws Exception {
        try {
//            rabbitTemplate.convertAndSend(exchange, routeKey, message);
            messageSender.handlerMessage(message);
        }catch (Exception e){
//            logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e));
            e.printStackTrace();
        }
    }

}


Controller层,写一个接口,往Rabbitmq上发送消息。

package com.basic.rabbitmq.productor.controller;

import com.basic.rabbitmq.productor.constant.WebStatusEnum;
import com.basic.rabbitmq.productor.model.ResponseVo;
import com.basic.rabbitmq.productor.service.EmailService;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

import javax.annotation.Resource;

/**
 * 接口测试类
 *
 * Created by sdc on 2017/7/5.
 */
@RestController
@RequestMapping(value = "/email")
public class EmailController extends BaseController{

    @Resource(name = "emailService")
    private EmailService emailService;

    /**
     * 发供邮件服务
     * @return
     * @throws Exception
     */
    @RequestMapping(value="/sendEmail", method = RequestMethod.GET)
    public ResponseVo<?> sendEmail() throws Exception {
        String emailMessage = "邮件消息";
        emailService.sendEmailForQueue(emailMessage);
        return generateResponseVo(WebStatusEnum.SUCCESS, "");
    }

}


EmailController的父类,抽出来,用于别的controller类集成。

package com.basic.rabbitmq.productor.controller;


import com.basic.rabbitmq.productor.constant.WebStatusEnum;
import com.basic.rabbitmq.productor.model.ResponseVo;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

/**
 * 提取出来一个类,用来作为controller层的父类,统一的返回格式。
 *
 * Created by sdc on 2017/7/6.
 */
public class BaseController {
    /**
     * 生成统一的返回响应对象
     *
     * @param webStatusEnum 状态码枚举
     * @param data 数据对象
     * @param <T> 数据对象类型参数
     * @return
     */
    public <T> ResponseVo generateResponseVo(WebStatusEnum webStatusEnum, T data) {
        return new ResponseVo(webStatusEnum.getCode(), webStatusEnum.getDesc(), data);
    }

    /**
     * 获取当前会话
     *
     * @param request 请求
     * @return httpSession
     */
    public HttpSession getCurrentSession(HttpServletRequest request) {
        return request.getSession();
    }


}


常量类,用于返回给页面展示的公共类:

package com.basic.rabbitmq.productor.model;

import java.io.Serializable;

/**
 * 定义统一的返回格式,用于和前端一起沟通
 *
 * Created by sdc on 2017/7/7.
 */
public class ResponseVo<T> implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 状态码
     */
    private String code;

    /**
     * 状态码对应的信息
     */
    private String message;

    /**
     * 数据对象
     */
    private T data;

    public ResponseVo(String code, String message, T data) {
        super();
        this.code = code;
        this.message = message;
        this.data = data;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }
}
package com.basic.rabbitmq.productor.constant;

/**
 * Created by sdc on 2017/7/7.
 */
public enum WebStatusEnum {


    /**
     * 定义接口返回状态码
     *
     * 通用部分范围 5000 +
     * 业务使用范围 2000 至 4000
     */

    SUCCESS("5000", "成功"),
    FAILED("7000", "失败"),


    PARAM_ERROR("7001", "参数错误"),
    PARAM_NOT_NULL("7002", "参数不能为空");


    /**
     * 系统码
     */
    private String code;

    /**
     * 描述
     */
    private String desc;

    WebStatusEnum(String code, String desc) {
        this.code = code;
        this.desc = desc;
    }

    public static WebStatusEnum getWebStatusEnumByKey(String key){
        for(WebStatusEnum bt : values()){
            if(bt.getCode().equals(key) )
                return bt;
        }
        return null;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }
}


最后pom.xml文件,用于下载jar包的。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>email-server</artifactId>
        <groupId>com.basic.email</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>email-server-productor</artifactId>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <fastjson.version>1.2.6</fastjson.version>
        <commons-lang3.version>3.4</commons-lang3.version>
        <guava.version>18.0</guava.version>
        <javax.mail.version>1.4.7</javax.mail.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>

        <dependency>
            <groupId>javax.mail</groupId>
            <artifactId>mail</artifactId>
            <version>${javax.mail.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.4</version>
        </dependency>

        <!-- log -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.0.1</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>basic-model</finalName>
        <plugins>
            <!-- 编译插件 -->
            <!--编译插件设置-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>


项目启动成功后,输入http://localhost:8080/email/sendEmail,

这个接口,每当成功一次就会在rabbitmq的管理页面上,看见队列里多一条信息。


技术分享


最后就是成功了,用到的一些概念请自己查找,记忆会深刻些。



本文出自 “10093778” 博客,请务必保留此出处http://10103778.blog.51cto.com/10093778/1945756

以上是关于RabbitMq+Spring boot 消息生产者向队列发送消息 的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot (十三): Spring Boot 整合 RabbitMQ

Spring Boot 入门:集成RabbitMQ消息队列

消息队列MQ——Spring Boot整合RabbitMQ

译Spring官方教程:Spring Boot整合消息中间件RabbitMQ

Spring Boot (25) RabbitMQ消息队列

spring boot Rabbitmq集成,延时消息队列实现