基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处理场景

Posted joey-java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处理场景相关的知识,希望对你有一定的参考价值。

前言

传统处理超时订单
  • 采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好
  • 当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作

    jdk延迟队列 DelayQueue
  • 采取jdk自带的延迟队列能很好的优化传统的处理方案,但是该方案的弊、端也是非常致命的,所有的消息数据都是存于内存之中,一旦宕机或重启服务队列中数据就全无了,而且也无法进行扩展。
  • rabbitMQ延时队列方案
    rabbitmq我就不多介绍了,一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

    术语 (详情请参照官网文档:http://www.rabbitmq.com/admin-guide.html

    存活时间(Time-To-Live 简称 TTL),分别有三种TTL的设置模式
  • x-message-ttl ,该属性是在创建队列的时候 ,在arguments的map中配置;该参数的作用是设置当前队列中所有的消息的存活时间
  • x-expires 该属性也是在arguments中配置;其作用是设置当前队列在N毫秒中(不能为0,且为正整数),就删除该队列;“未使用”意味着队列没有消费者,队列尚未重新声明,并且至少在有效期内未调用basicGet (basicGet 是手动拉取指定队列中的一条消息)
  • AMQP.BasicProperties配置中的exppiration 属性,前两者都是基于队列的TTL,该属性是基于单条消息的TLL用于配置每条消息在队列中的存活时间

    死信交换(Dead Letter Exchanges 简称 DLX)
  • ”死信交换“ 可以分开来理解 ;首先是 ”死信“,也就是死亡的信息,无效的信息;造成这样的信息有以下几种情况
    消息被拒绝,即消费者没有成功确认消息被消费
    消息TTL过期
    超出队列长度限制
    当出现这三种情况的时候,队列中的消息就会变为“死信”

  • 再来理解”交换“ 也就是说,当出现"死信"的情况下 rabbitmq 可以对该"死信"进行交换到别的队列上,但是交换的前提是需要为死信配置一个交换机用于死信的交换
    ?

代码实现

配置类 RabbitmqConfiguration
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * TODO rabbitmq配置类
 */  
public class RabbitmqConfiguration { 
    private final String SERVER_HOST="127.0.0.1";//rabbitmq 服务器地址
    private final int PORT=5672;//端口号 
    private final String USER_NAME="test";//用户名
    private final String PASSWORD="test";//密码
    private final boolean QUEUE_SAVE =true;//队列是否持久化
    private final String MESSAGE_SAVE = "1" ;//消息持久化  1,0 
    //rabbitmq 连接工厂
    private final ConnectionFactory RAB_FACTORY = new ConnectionFactory();
    private Connection connection;
  
    public void init() throws Exception{
        RAB_FACTORY.setHost(SERVER_HOST);
        RAB_FACTORY.setPort(PORT); 
        RAB_FACTORY.setUsername(USER_NAME);
        RAB_FACTORY.setPassword(PASSWORD);
        this.connection=RAB_FACTORY.newConnection();
    } 
    public Connection getConnection() {
        return connection;
    }
 
    public void setConnection(Connection connection) {
        this.connection = connection;
    }
 
    public boolean isQUEUE_SAVE() {
        return QUEUE_SAVE;
    }
 
    public String getMESSAGE_SAVE() {
        return MESSAGE_SAVE;
    }
        
}
功能类 OrderOverTimeQueue
import java.io.IOException; 
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * TODO 超时未支付订单处理消息队列
 */
public class OrderOverTimeQueue {
    
    private RabbitmqConfiguration rabConf;
      
    //队列名称
    //****==================订单延时队列=======================*****//
    //订单延时队列
    public final String DELAY_QUEUE_NAME = "delay-queue-orderOverTime";
    //订单延时消费队列
    public final String CONSUME_QUEUE_NAME = "consume-queue-orderOverTime";
    //订单延时队列死信交换的交换器名称
    public final String EXCHANGENAME = "exchange-orderOverTime";
    //订单延时队列死信的交换器路由key
    public final String ROUTINGKEY = "routingKey-orderOverTime";
    
    private Channel delayChannel;//延时队列连接通道
    
    private Channel consumerChannel;//消费队列连接通道
    
    public void init() throws Exception{
        //创建连接通道
        delayChannel=rabConf.getConnection().createChannel();  
        consumerChannel=rabConf.getConnection().createChannel();  
        
        //创建交换器
        consumerChannel.exchangeDeclare(EXCHANGENAME,"direct"); 
        
        /**创建处理延时消息的延时队列*/
        Map <String,Object> arg = new HashMap <String,Object>();
        //配置死信交换器
        arg.put("x-dead-letter-exchange",EXCHANGENAME); //交换器名称
        //死信交换路由key (交换器可以将死信交换到很多个其他的消费队列,可以用不同的路由key 来将死信路由到不同的消费队列去)
        arg.put("x-dead-letter-routing-key", ROUTINGKEY); 
        delayChannel.queueDeclare(DELAY_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, arg);  
        
        /**创建消费队列*/
        consumerChannel.queueDeclare(CONSUME_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, null);
        //参数1:绑定的队列名  参数2:绑定至哪个交换器  参数3:绑定路由key
        consumerChannel.queueBind(CONSUME_QUEUE_NAME, EXCHANGENAME,ROUTINGKEY); 
        //最多接受条数 0为无限制,每次消费消息数(根据实际场景设置),true=作用于整channel,false=作用于具体的消费者
        consumerChannel.basicQos(0,10, false);
        
        //创建消费队列的消费者
        Consumer consumer = new DefaultConsumer(consumerChannel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) 
              throws IOException  {
                String message = new String(body, "UTF-8");
                try {
                    //业务逻辑处理
                    ConsumeMessage(message);  
                    //确认消息已经消费  参数2(true=设置后续消息为自动确认消费  false=为手动确认)
                    consumerChannel.basicAck(envelope.getDeliveryTag(), false);
                }catch (Exception e) { 
                }  
              }
            }; 
                
        boolean flag=false;//是否手动确认消息  true 是  false否 
        consumerChannel.basicConsume(CONSUME_QUEUE_NAME, flag, consumer);
    }
 
    /**
     * 方法描述: 发送延迟订单处理消息
     * @param msg 消息内容 (订单号或者json格式字符串)
     * @param overTime 消息存活时间
     * @throws Exception
     */
    public void sendMessage(String msg,Long overTime) throws Exception{ 
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration(overTime.toString()) //设置消息存活时间(毫秒)
                .build();
        delayChannel.basicPublish("",DELAY_QUEUE_NAME, properties, msg.getBytes("UTF-8"));
    }
    
    /**
     * 
     * 方法描述: 
     * 业务逻辑说明: TODO(总结性的归纳方法业务逻辑) 
     * @param msg 消费消息(订单号,或特定格式json字符串)
     * @throws InterruptedException
     */
    public void ConsumeMessage(String msg) throws InterruptedException { 
        Thread.sleep(50);//模拟业务逻辑处理
        System.out.println("处理到期消息时间=="+System.currentTimeMillis());
        System.err.println("删除订单 order-number  ==  "+msg);
    }
    
    
    public RabbitmqConfiguration getRabConf() {
        return rabConf;
    }
 
 
    public void setRabConf(RabbitmqConfiguration rabConf) {
        this.rabConf = rabConf;
    }
    
    public static void main(String[] args) throws Exception {
        OrderOverTimeQueue ooto=new OrderOverTimeQueue();
        RabbitmqConfiguration rf= new RabbitmqConfiguration();
        rf.init();
        ooto.setRabConf(rf);
        ooto.init();
 
        //模拟用户产生订单 消息生存时长为30秒
        ooto.sendMessage("20180907-order-number", 10000l);
        
        System.out.println("创建消息时间=="+System.currentTimeMillis());
        
        
        
    }
    
    
}
最终效果

技术图片

如果消息还存活的话,在延迟队列中的“ready”和“total”中都会存在相应的消息记录数

技术图片

写的比较粗糙 欢迎大家发表自己的观点 >_< !

以上是关于基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处理场景的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ实现延时队列(死信队列)

RabbitMQ实现延时队列(死信队列)

RabbitMQ:伪延时队列

springboot使用RabbitMQ实现延时任务

RabbitMq延时队列,订单过期,取消支付场景

RabbitMQ的死信队列和延时队列