Springboot 整合 阿里云消息队列RabbitMQ版服务

Posted 君燕尾

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot 整合 阿里云消息队列RabbitMQ版服务相关的知识,希望对你有一定的参考价值。

因为公司的需要服务都是用的阿里云相关的产品,最近自己工作中也涉及到了消息队列这一块的业务,索性自己也来从零开始对接阿里云的消息队列服务。

准备

本着学习的前提,寻找是否免费的或者做活动的服务,能白嫖的就白嫖,果然被我找到了。

  1. 进入阿里云官方首页,找到精选活动->阿里云使用中心 点击进入

        2.进入页面搜索消息队列

        3.  具体队列的相关配置步骤可参考官方文档:快速入门概述 - 消息队列RabbitMQ版 - 阿里云

        4. 本来Rocket版、Kafka版都想学习的,但只有rabbit版的免费,但也够意思了毕竟不要钱(虽然免费但后面还留了一个很大的坑等着踩呢

开始

        1. 创建一个springboot项目 命名为:rabbitmq-aliyun

        2.在application.yml中配置相关mq常量(常量可从前面自己创建的阿里云mq控制台获取)

server:
  port: 8080


aliyun:
  rabbitmq:
    accessKey: 密匙key
    accessKeySecret: 密匙密码
    username: 静态用户名
    password:  静态密码
    vHost: 虚拟机名称
    exchange: 交换机名称
    exType: 交换机类型
    queue: 队列名称
    BindingKey:  路由key
    host: 介入点(公网接入点)

        :本地测试必须使用公网接入点  ,但是我们使用的免费rabbitMq服务并没有公网接入点,只有VPC接入点

 所以自己按照官方教程一步一步实现,总是连接超时,代码检查了无数遍也没有找到问题所在。(官方教程也没有表明用哪一个接入点地址,进了这个大坑)

最后只能需求官方客户帮助:

本着,不花钱的原则,但是使用VPC接入点 还得购买 阿里云ecs服务,岂不是还得花更多的钱。

最后只能升级服务,并且选择支持公网

 

所以说,白嫖也并没有完全白嫖,还得花钱,要么买ecs服务,要么升配队列服务

3.创建配置数据映射对象 RabbitMqConfigDTO.class

@Configuration
@ConfigurationProperties("aliyun.rabbitmq")
@Data
public class RabbitMqConfigDTO 

    /**
     * 账户密匙key
     */
    private String accessKey;

    /**
     * 账户密匙
     */
    private String accessKeySecret;

    /**
     *  静态用户名
     */
    private String username;

    /**
     * 静态用户名密码
     */
    private String password;

    /**
     * 虚拟机名称
     */
    private String vHost;

    /**
     * 交换机名
     */
    private String exchange;

    /**
     * 交换机类型
     */
    private String exType;

    /**
     * 队列名
     */
    private String queue;

    /**
     * 绑定规则key
     */
    private String BindingKey;

    /**
     * 接入点地址
     */
    private String host;

        4. 创建spring工具类 SpringContextHolder.class 用于获取bean对象

public class SpringContextHolder implements ApplicationContextAware 

    @Autowired
    private static ApplicationContext applicationContext;

    public SpringContextHolder() 
    

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
        SpringContextHolder.applicationContext = applicationContext;
    

    public static ApplicationContext getApplicationContext() 
        assertApplicationContext();
        return applicationContext;
    

    public static <T> T getBean(String beanName) 
        assertApplicationContext();
        return (T) applicationContext.getBean(beanName);
    

    public static <T> T getBean(Class<T> requiredType) 
        assertApplicationContext();
        return applicationContext.getBean(requiredType);
    

    private static void assertApplicationContext() 
        if (applicationContext == null) 
            throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
        
    

        5. 创建rabbitMq工具类  RabbitMqUtil.class

@Slf4j
@Component
public class RabbitMqUtil 
    
    @Autowired
    private RabbitMqConfigDTO rabbitMqConfigDTO;

    //第三步 建一个静态的本类
    private static RabbitMqUtil rabbitMqUtil;

    //第四步 初始化
    @PostConstruct
    public void init() 
        rabbitMqUtil = this;
    
    
    /**
     * 创建队列连接
     * @return
     */
    public static Connection getRabbitConnection()

        ConnectionFactory factory = new ConnectionFactory();

        //公网接入点
        factory.setHost(rabbitMqUtil.rabbitMqConfigDTO.getHost());
        //静态用户名
        factory.setUsername(rabbitMqUtil.rabbitMqConfigDTO.getUsername());
        //静态密码
        factory.setPassword(rabbitMqUtil.rabbitMqConfigDTO.getPassword());

        //自动恢复
        factory.setAutomaticRecoveryEnabled(true);
        //网络恢复时间
        factory.setNetworkRecoveryInterval(5000);
        //虚拟机名称
        factory.setVirtualHost(rabbitMqUtil.rabbitMqConfigDTO.getVHost());
        //端口
        factory.setPort(5672);
        //连接超时时间
        factory.setConnectionTimeout(30*100);
        //设置握手超时时间
        factory.setHandshakeTimeout(300000000);
        factory.setShutdownTimeout(0);

        //创建连接
        Connection connection = null;
        try 
            connection =factory.newConnection();

        catch (Exception e)
            log.error("rabbitMq连接异常", e);
        

        return connection;
    

    /**
     * 创建队列通道
     * @param connection
     * @return
     */
    public static Channel getRabbitChannel(Connection connection)

        Channel channel = null;
        try 
            channel = connection.createChannel();
            String exchange = rabbitMqUtil.rabbitMqConfigDTO.getExchange();
            channel.exchangeDeclare(exchange,rabbitMqUtil.rabbitMqConfigDTO.getExType(), true, false,false, null);
            channel.queueDeclare(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
            channel.queueBind(rabbitMqUtil.rabbitMqConfigDTO.getQueue(), rabbitMqUtil.rabbitMqConfigDTO.getExchange(), rabbitMqUtil.rabbitMqConfigDTO.getBindingKey());


        catch (Exception e)
            log.error("创建rabbitMq通道异常", e);
        

        return channel;
    


        6.创建server接口类

public interface RabbitMqService 

    /**
     * 发送mq消息
     * @return
     */
    String sendMessage() throws IOException, TimeoutException;

    /**
     * 消费消息
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    String consumeMessage() throws IOException, TimeoutException;

        7.创建实现类

@Service
public class RabbitMqServiceImpl implements RabbitMqService 

    @Autowired
    private RabbitMqConfigDTO rabbitMqConfigDTO;

    @Override
    public String sendMessage() throws IOException 

        Connection connection = RabbitMqUtil.getRabbitConnection();
        Channel channel = RabbitMqUtil.getRabbitChannel(connection);
        //开始发送消息
        for(int i=0; i< 10 ; i++)
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(rabbitMqConfigDTO.getExchange(), "BindingKey", true, props,
                    ("消息发送Body"  + i).getBytes(StandardCharsets.UTF_8));

        
        connection.close();
        return "消息发送成功";

    

    @Override
    public String consumeMessage() throws IOException, TimeoutException 

        Connection connection = RabbitMqUtil.getRabbitConnection();
        Channel channel = RabbitMqUtil.getRabbitChannel(connection);

        String exchange = rabbitMqConfigDTO.getExchange();
        channel.exchangeDeclare(exchange,rabbitMqConfigDTO.getExType(), true, false,false, null);
        channel.queueDeclare(rabbitMqConfigDTO.getQueue(), true, false, false, new HashMap<String, Object>());
        channel.queueBind(rabbitMqConfigDTO.getQueue(), rabbitMqConfigDTO.getExchange(), rabbitMqConfigDTO.getBindingKey());

        // 开始消费消息。
        channel.basicConsume(rabbitMqConfigDTO.getQueue(), false, "ConsumerTag", new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException 
                //接收到的消息,进行业务逻辑处理。
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            
        );
        connection.close();

        return "消费成功";
    
    

         8.创建控制层

@RestController
public class RabbitMqController 

    @Autowired
    private RabbitMqService rabbitMqService;

    @GetMapping("/sendMessage")
    public String sendMessage() throws IOException, TimeoutException 

        return rabbitMqService.sendMessage();
    

    @GetMapping("/consumeMessage")
    public String consumeMessage() throws IOException, TimeoutException 

        return rabbitMqService.consumeMessage();
    

        9.项目整体结构

        

     10.完成启动项目

     11.点击获取源码

测试 

  1. 发送消息

     2. 进入控制台查看 

 

         此时可以看到堆积10条消息,说明消息发送成功了

        3. 消费消息

         4.再次进入控制台查看

                堆积的消息已变为0说明消息已经被全部消费了

后序

自己遇到不懂的,比如前面本地测试需要公网接入点的问题,百度了不少博客,也没有找到sprigboot 整合 阿里云 消息队列RabbitMQ版的博客已经解决方案,所以博主按自己的实际操作记录下来,希望对同样遇到问题的小伙伴有所帮助。

以上是关于Springboot 整合 阿里云消息队列RabbitMQ版服务的主要内容,如果未能解决你的问题,请参考以下文章

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

startersspringboot-starter整合阿里云datahub

startersspringboot-starter整合阿里云datahub

SpringBoot整合RocketMQ

springboot系列-springboot整合RabbitMQ