spring cloud延时队列的使用

Posted mentalidade

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring cloud延时队列的使用相关的知识,希望对你有一定的参考价值。

  • 用户下单,需要在订单的有效截止时间前30分钟,提醒用户去使用。同时在到达有效截止时间,要将订单状态设置为失效。这时候可以用延时队列可以很好的解决,用户下单之后,计算出结束时间前半个小时的时长,发送一条延时消息提醒用户使用。订单结束的时长发送订单已经失效的消息。

入口

	/**
	 * 爆品助力状态提醒
	 *
	 * @param req 爆品助力失败
	 */
	@RequestMapping(path = "/mq/product/sendProductHelpStatusMessage", method = RequestMethod.POST)
	Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req);

生产者

    @Override
    public Integer sendProductHelpStatusMessage(@RequestBody HashMap<String,String> req){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        req.put("sendTime",sdf.format(new Date()));
        String beanToJson = JsonUtils.beanToJson(req);
        log.info("sendProductHelpStatusMessage:{}",beanToJson);
        productHelpStatusMessageChannel.productHelpStatusOutput().send(MessageBuilder.withPayload(beanToJson).setHeader("x-delay", req.get("delay")).build());
        return 1;
    }

将消息发送出去,延时delay毫秒,同时记录下消息发送的时间。这样就可以根据传递的参数来确定延时的具体时长。

消费者

package org.xxx.mq.provider.consumer.product;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.xxx.mq.api.channel.consumer.ProductHelpStatusMessageChannel;

import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
@EnableBinding(value = {ProductHelpStatusMessageChannel.class})
public class ProductHelpStatusConsumer {
    @StreamListener(target = ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
    public void receiveProductHelpStatusWxMessage(String message){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("receiveProductHelpStatusWxMessage:{},receiveTime,{}",message,sdf.format(new Date()));

    }
}

接受消息,同时记录下接受消息的时间。

通道

package org.xxx.mq.api.channel.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface ProductHelpStatusMessageChannel {
    //爆品助力
    String PRODUCT_HELP_STATUS_OUTPUT = "productHelpStatusOutput";

    String PRODUCT_HELP_STATUS_WX_INPUT = "productHelpStatusWxInput";

    /**
     * 爆品助力消息发送通道
     *
     * @return
     */
    @Output(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_OUTPUT)
    MessageChannel productHelpStatusOutput();


    /**
     * 爆品助力消息订阅(微信消息)
     *
     * @return SubscribableChannel,消息订阅通道
     */
    @Input(ProductHelpStatusMessageChannel.PRODUCT_HELP_STATUS_WX_INPUT)
    SubscribableChannel getProductHelpStatusWxInputChannel();
}

配置application.yml

spring:
  datasource:
    hikari:
      maximum-pool-size: 50
      minimum-idle: 50
  cloud:
    stream:
      rabbit:
        bindings:
          #订阅通道 测试通道
          productHelpStatusOutput:
            producer:
              delayed-exchange: true
          productHelpStatusWxInput:
            consumer:
              auto-bind-dlq: true
              republishToDlq: true
              requeueRejected: true
              delayed-exchange: true
              dlq-ttl: ${queue.dlq.ttl}
              dlq-dead-letter-exchange:
      bindings:
        #生产者 爆品助力状态消息发送通道
        productHelpStatusOutput:
          destination: productHelpStatusExchange
          group: productHelpFailQueueGroup
        #消费者
        productHelpStatusWxInput:
          destination: productHelpStatusExchange
          group: productHelpStatusWxGroup
          consumer:
            max-attempts: 3
            backOffInitialInterval: 1000
            backOffMaxInterval: 10000
            backOffMultiplier: 2.0

需要配置等等。。。如上,
spring.cloud.stream.rabbit.bindings.productHelpStatusOutput.producer.delayed-exchange=true
spring.cloud.stream.bindings.productHelpStatusOutput.producer.delayed-exchange=true

测试

打开postman,请求接口,

{
    "delay": 10000,
    "orderSn":1
}

orderSn订单和delay延时时间(单位为毫秒)。请求orderSn=1的延时10秒delay=10000,orderSn=2的延时5秒delay=5000

2020-02-21 17:03:45.125  INFO [mq,b4e13b1dc86b0d7f,b4e13b1dc86b0d7f,true] 90220 --- [0.0-1205-exec-3] o.a.m.p.controller.ProductMqController   : sendProductHelpStatusMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"}
2020-02-21 17:03:51.769  INFO [mq,820007cc648d3e43,820007cc648d3e43,true] 90220 --- [0.0-1205-exec-4] o.a.m.p.controller.ProductMqController   : sendProductHelpStatusMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"}
2020-02-21 17:03:55.369  INFO [mq,b4e13b1dc86b0d7f,2b113702a181d49b,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer    : receiveProductHelpStatusWxMessage:{"delay":"10000","orderSn":"1","sendTime":"2020-02-21 17:03:45"},receiveTime,2020-02-21 17:03:55
2020-02-21 17:03:56.847  INFO [mq,820007cc648d3e43,60f5a71795043ac2,true] 90220 --- [StatusWxGroup-1] o.a.m.p.c.p.ProductHelpStatusConsumer    : receiveProductHelpStatusWxMessage:{"delay":"5000","orderSn":"2","sendTime":"2020-02-21 17:03:51"},receiveTime,2020-02-21 17:03:56

如上订单1在 17:03:45发送消息,10秒后17:03:55消费者受到消息。订单2也在5秒后受到消息。

以上是关于spring cloud延时队列的使用的主要内容,如果未能解决你的问题,请参考以下文章

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

分布式系统的延时和故障容错之Spring Cloud Hystrix

spring cloudgradle父子项目微服务框架搭建---rabbitMQ延时队列

spring boot Rabbitmq集成,延时消息队列实现

防止 spring-cloud-aws-messaging 尝试停止队列