spring cloudgradle父子项目微服务框架搭建---rabbitMQ延时队列

Posted 预立科技

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring cloudgradle父子项目微服务框架搭建---rabbitMQ延时队列相关的知识,希望对你有一定的参考价值。


总目录

https://preparedata.blog.csdn.net/article/details/120062997


文章目录

延时队列的配置是对上片文章的延伸扩展
https://preparedata.blog.csdn.net/article/details/128647139


一、rabbit延时插件下载

查看 RabbitMQ 服务对应的版本号, 示例是3.8.9

访问github,找到对应的插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载扩展名为ez的插件包即可


二、rabbit插件安装

以windows安装为例,将插件包直接放到安装目录plugins文件夹下
C:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.8.9\\plugins

打开dos窗口,直接启动插件

C:\\Users\\Administrator>rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后再重启rabbit服务

可以打开页面的rabbit管理端,Type类型已经多了一个x-delayed-message,说明插件安装成功


三、项目中配置延时队列

延时队列,新增延时类型声明,其他配置和即时消费的队列一样,即时消费的队列不需要此配置

cloud:
    stream:
        rabbit:
            bindings:
                # 订单-生产者
                orderDelayChannelOutput:
                producer:
                    delayed-exchange: true
                # 订单-消费者
                orderDelayChannelInput:
                consumer:
                    delayed-exchange: true

下面是完整配置

cloud:
    stream:
      # 绑定消息中间件的
      binders:
        # rabbit0000 别名, 同时可以绑定多个不同类型的消息中间件  rabbit0001、rabbit0002、kafka0001
        rabbit0000:
          # 声明类型,是rabbit
          type: rabbit
          environment:
            spring:
              rabbitmq:
                # guest连接登录时,需要使用 localhost
                host: localhost
                # 管理端页面端口是:15672, 配置服务时:5672
                port: 5672
                username: guest
                password: guest
                # 虚拟主机命名空间,"/", 默认虚拟主机,  可以自定义(例如dev、test、prod),区分环境
                virtual-host: /
      # 绑定消息通道,生产通道、消费通道
      bindings:
        # 订单-生产者-延时队列  自定义通道名称
        orderDelayChannelOutput:
          # 生产者和消费者 连接的桥梁 Queues的名称
          destination: shopping.order.create.delay
          # 配置组
          group: shopping-order
        # 订单-消费者-延时队列  自定义通道名称
        orderDelayChannelInput:
          destination: shopping.order.create.delay
          group: shopping-order
      rabbit:
        bindings:
          # 订单-生产者
          orderDelayChannelOutput:
            producer:
              delayed-exchange: true
          # 订单-消费者
          orderDelayChannelInput:
            consumer:
              delayed-exchange: true

四、定义消息通道

和即时消费的队列,是相同的

package com.pd.shopping.order.mq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface Channels 
    /**
     * 订单-消息生产者
     */
    String ORDER_DELAY_OUTPUT = "orderDelayChannelOutput";
    @Output(ORDER_DELAY_OUTPUT)
    MessageChannel orderDelayChannelOutput();

    /**
     * 订单-消息消费者
     */
    String ORDER_DELAY_INPUT = "orderDelayChannelInput";
    @Input(ORDER_DELAY_INPUT)
    SubscribableChannel orderDelayChannelInput();


五、生成消息

package com.pd.shopping.order.controller;

import com.pd.shopping.order.model.bo.OrderMsgBo;
import com.pd.shopping.order.mq.Channels;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging

@RestController
@RequestMapping("/hello")
public class HelloController 
    @Autowired
    private Channels channels;

    @GetMapping("/testDelayedMq")
    public void testDelayedMq() 
        OrderMsgBo bo = new OrderMsgBo();
        bo.setId(1L);
        bo.setCode("aaa");

        //基础延时时间单位是毫秒
        Integer delayTime = 5 * 1000;

        Message<OrderMsgBo> message = MessageBuilder
                                    .withPayload(bo)
                                    .setHeader("x-delay", delayTime)
                                    .build();
        channels.orderDelayChannelOutput().send(message);

    

延时队列需要在消息头中添加"x-delay", 并且给定一个延时时间,单位毫秒

上面代码逻辑是,发送一条消息成功后,不会立即消费,消息通道的消息停留5秒,然后再去消费


六、监听消息,进行消费

和即时消费的队列,是相同的

package com.pd.shopping.order.mq;

import com.pd.shopping.order.model.bo.OrderMsgBo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@EnableBinding(Channels.class)
@Component
public class Listener 

    @StreamListener(Channels.ORDER_DELAY_INPUT)
    public void orderDelayInputListener(Message<OrderMsgBo> message) 
        OrderMsgBo orderMsgBo = message.getPayload();
        //todo 业务处理
    



以上是关于spring cloudgradle父子项目微服务框架搭建---rabbitMQ延时队列的主要内容,如果未能解决你的问题,请参考以下文章

使用http://start.spring.io/构建maven微服务项目的几个坑及eclipse构建spring boot微服务项目

使用 Spring Boot 和 Spring Cloud 的微服务项目结构

Spring Cloud 微服务项目实战 -

Spring Cloud 微服务项目实战 -

最全的spring微服务学习实战项目

Hystrix 概述与入门