Redis进阶学习04---秒杀优化和消息队列

Posted 大忽悠爱忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis进阶学习04---秒杀优化和消息队列相关的知识,希望对你有一定的参考价值。

Redis进阶学习04---秒杀优化和消息队列


秒杀优化

如果一个饭店只有一个服务员,并且这个服务员不仅需要负责客人的点餐服务,还需要负责炒菜服务,显然这样的话,只能是先处理完第一个客人所有的点餐,烧菜任务后,才能去处理下一个客人的点餐,烧菜任务,这样显然把任务给串行化了,效率大大降低。

而现在我们就面临这样的问题:

目前整个秒杀的过程都是串行化执行的,并且这个流程里面涉及多次数据库查询操作,数据库查询是最耗费时间的,因此优化的思路就是把最耗费时间的数据库写操作转换为异步执行,然后把数据库查询操作通过redis查询替换掉,这样整体就分为了两部分,一部分是主线程去redis判断校验,然后如果判断和校验都通过了,就将消息放入一个队列中,异步线程从该队列中取出消息,然后去执行数据库写操作。


此时redis就相当于服务员,负责库存数量判断和重复购买校验,然后将合法的订单交易,放入队列中,异步处理线程,从队列读取消息,进行数据库写处理,即扣减库存,创建订单的耗时逻辑,全部异步完成。


显然,关于redis那部分判断逻辑,应该都由lua脚本来完成,而非java代码


秒杀优化的具体实现


1.新增优惠卷的同时,将优惠卷信息保存到Redis中

    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) 
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存优惠卷信息到Redis
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
    

测试:

2.lua脚本编写

-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]

--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId

--3.脚本业务
--3.1判断库存是否充足 get storeKey
if(tonumber(redis.call('get',storeKey))<=0) then
    --3.2库存不足,返回1
    return 1
end

--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value 
if(redis.call('sismember',orderKey,userId)==1) then
      --3.3存在,说明是重复下单,返回2
    return 2
end

--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
return 0

3.修改抢购逻辑

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService 
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;

     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static 
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
         SECKILL_SCRIPT.setResultType(Long.class);
     

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) 
        Long uid = UserHolder.getUser().getId();
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0)
            return Result.fail(r==1?"库存不足":"不能重复下单");
        
         //3.为0,有购买资格,把下单信息保存到阻塞队列
        long order = redisWorker.nextId("order");
        //TODO:保存到阻塞队列
        //4.返回订单id
        return Result.ok(order);
    


当我们测试一下后:


此时数据库并无变化,因为我们还没把消息放入阻塞队列,从而通知异步线程去处理


4.异步线程处理阻塞队列中的消息

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService 
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;
    

     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static 
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
         SECKILL_SCRIPT.setResultType(Long.class);
     


    /**
     * 阻塞队列
     */
    private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue(1024*1024);

    /**
     * 异步线程
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

     @PostConstruct
     public void init()
         SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
     

    public class VoucherOrderHandler implements Runnable
        @Override
        public void run() 
          while(true)
              //1.获取队列中的订单信息
              try 
                  //1.获取队列中的订单信息
                  VoucherOrder voucherOrder = orderTasks.take();
                  //2.创建订单
                  createVoucherOrder(voucherOrder);
               catch (InterruptedException e) 
                  log.error("订单创建异常: ",e);
              
          
        

        /**
         * 保守起见,还会再次进行判断
         */
        private void createVoucherOrder(VoucherOrder voucherOrder) 
            Long userId = voucherOrder.getUserId();
            Long voucherId = voucherOrder.getVoucherId();
            //创建锁对象
            RLock lock = redissonClient.getLock("lock:order:" + userId);
            //尝试获取分布式锁
            // 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
            //锁自动释放时间--默认30秒
            //时间单位
            boolean tryLock = lock.tryLock();
            if(!tryLock)
                log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
            
            //我们只需要确保下面这两行代码的集群并发问题被解决
            try
                Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

                if(count>0)
                    log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
                
            finally 
                lock.unlock();
            

            //6.扣减库存
            boolean success = iSeckillVoucherService.update()
                    .setSql("stock = stock -1")
                    .eq("voucher_id", voucherId)
                    .gt("stock",0)
                    .update();
            if(!success)
               log.error("库存扣减失败");
            
            save(voucherOrder);
        
    

    @Override
    public Result seckillVoucher(Long voucherId) 
        Long uid = UserHolder.getUser().getId();
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0)
            return Result.fail(r==1?"库存不足":"不能重复下单");
        
        //3.为0,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisWorker.nextId("order");
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(uid);
        voucherOrder.setVoucherId(voucherId);
        orderTasks.add(voucherOrder);
        //4.返回订单id
        return Result.ok(orderId);
    



基于jdk阻塞队列完成的秒杀优化总结


阻塞队列里面数据过多可能会导致jvm内存溢出,还有就是即便设置了阻塞队列最大元素个数上限也有弊端,就是如果元素过多,处理速度跟不上,会导致很多额外任务放入阻塞队列失败

还有就是数据都是存放在内存中的,一旦java程序出现异常,那么内存中的任务将会全部丢失,并且一旦出现异常,也会导致某个任务执行失败


Redis消息队列实现秒杀


基于Redis的List实现消息队列


基于Redis的PubSub实现消息队列



基于Stream实现消息队列



默认是非阻塞的,并且如果阻塞时长传入0,表示无限等待



基于Stream的消息队列之消费者组




Redis-Stream详解

相信各位光看上面的介绍,应该对Stream还是一知半解,下面我来详细介绍一下它的用法:

追加新消息,XADD,生产消息

XADD,命令用于在某个stream(流数据)中追加消息,演示如下:

127.0.0.1:6379> XADD memberMessage * user kang msg Hello
"1651325244694-0"
127.0.0.1:6379> XADD memberMessage * user zhong  msg nihao
"1651325256282-0"

其中语法格式为:

XADD key ID field string [field string …]

需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。

ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。

field string [field string], 就是当前消息内容,由1个或多个key-value构成。

上面的例子中,在memberMemsages这个key中追加了user kang msg Hello这个消息。Redis使用毫秒时间戳和序号生成了消息ID。此


从消息队列中获取消息,XREAD,消费消息

XREAD,从Stream中读取消息,演示如下:

127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
   2) 1) 1) "1651325244694-0"
         2) 1) "user"
            2) "kang"
            3) "msg"
            4) "Hello"
      2) 1) "1651325256282-0"
         2) 1) "user"
            2) "zhong"
            3) "msg"
            4) "nihao"


消息被读取后,并不会从stream队列中消失,这点需要注意

上面的命令是从消息队列memberMessage中读取所有消息。XREAD支持很多参数,语法格式为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

其中:

  • [COUNT count],用于限定获取的消息数量
  • [BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式
  • ID,用于设置由哪个消息ID开始读取。使用0表示从第一条消息开始。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用 $ 表 示 最 新 的 消 息 I D 。 ( 在 非 阻 塞 模 式 下,表示最新的消息ID)。(在非阻塞模式下无意义)。

XRED读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。

一个典型的阻塞模式用法为:

127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)

我们使用Block模式,配合$作为ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD负责生成消息,XREAD负责消费消息。


消息ID说明

XADD生成的1553439850328-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。较真来说,序号可能会溢出,but真可能吗?

可以通过multi批处理,来验证序号的递增:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"

由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。

为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。

强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的,别忘了!


消费者组模式,consumer group

当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。

但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。

即消费者组模式可以让多个消费者协同合作,来共同消息队列中的消息,提高队列中消息的消费效率

消费者组模式的支持主要由两个命令实现:

  • XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作
  • XREADGROUP,分组消费消息操作

进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:

# 生产者生成10条消息
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
127.0.0.1:6379> XADD mq * msg 2
127.0.0.1:6379> XADD mq * msg 3
127.0.0.1:6379> XADD mq * msg 4
127.0.0.1:6379> XADD mq * msg 5
127.0.0.1:6379> EXEC
 1) "1553585533795-0"
 2) "1553585533795-1"
 3) "1553585533795-2"
 4) "1553585533795-3"
 5) "1553585533795-4"

# 创建消费组 mqGroup
127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
OK

# 消费者A,消费第1127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
1) 1) "mq"
   2) 1) 1) "1553585533795-0"
         2) 1) "msg"
            2) "1"
# 消费者A,消费第2127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-1"
         2) 1) "msg"
            2) "2"
# 消费者B,消费第3127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-2"
         2) 1) "msg"
            2) "3"
# 消费者A,消费第4127.0.0.1:6379MQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)

Redis之秒杀下单优化以及认识redis消息队列

Redis之秒杀下单优化以及认识redis消息队列

Redis之消息队列实现

Redis基于(ListPubSubStream消费者组)实现消息队列,基于Stream结构实现异步秒杀下单

Redis分布式缓存秒杀