利用Redis一步步实现优惠券的最终秒杀方案

Posted pshdhx_albert

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用Redis一步步实现优惠券的最终秒杀方案相关的知识,希望对你有一定的参考价值。

订单ID不能采用自增长的原因:

1、规律变化太明显。两天下单的ID的差值,能够计算出商城的订单量;

2、如果采用自增长,订单数据是会不断产生的,到时候要分表,但是每个表的ID都是从0开始增长的,这样ID就重复了。

全局ID生成器:

分布式系统环境下,用来生成全局唯一ID的工具。

1、唯一性;有个increment的特性;

2、高可用;能搭建集群

3、高性能;基于内存,效率高

4、递增型;【有利于数据库创建索引,提高数据库的查询速度】

5、安全性;

符号位:1bit,永远是0,代表正数;

时间戳:31bit,以秒为单位,可以使用69年;

序列号:32bit(Redis的递增值) 支持每秒产生2^32个ID【42亿】

Redis实现全局唯一生成器

package com.hmdp.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

@Component
public class RedisIdWorker 
    /**
     * 开始时间戳
     */
    private static final long BEGIN_TIMESTAMP = 1676037454L;
    /**
     * 序列号的位数【32个比特位】 2^32=40亿
     */
    private static final int COUNT_BITS = 32;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public long nextId(String keyPrefix) 
        // 1.生成时间戳【秒时间戳】
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        // 2.生成序列号
        // 2.1.获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2.自增长 【大概是每天40亿的上限】
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

        // 3.拼接并返回 【0 永远是第一位 31位的时间戳 32位的订单ID自增长】
        return timestamp << COUNT_BITS | count;
    


有些优惠券需要买,比如说美团红包。8元人民币买10元的红包。

​ tb_voucher:优惠券的基本信息,优惠金额、使用规则等。

有些优惠券需要抢【秒杀】,比如说政府为刺激消费,发放的汽车消费券,是有限的。

​ tb_seckill_voucher:优惠券的库存开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息。

抢购秒杀券时,需要判断

1、秒杀是否开始和结束

2、库存是否充足

【1、根据前端提交的优惠券id,获取优惠券信息】

【2、如果秒杀场景正确,可以考虑减库存。这里应该要使用到事务】

【3、创建订单信息给前端,让其支付】

基本下单和秒杀下单

**【超卖】**使用JMeter压测后,库存本来是100的,结果成了负数。新建的订单量竟然大于库存量,这就是在高并发环境的出现的情况。

需要加锁来解决。

1、悲观锁

认为线程安全问题一定会发生,因此在操作数据之前,先获取锁,确保线程串行执行。

例如:Synchronized、Lock都属于悲观锁。性能差了点,高并发环境下并不是很适合。

2、乐观锁【更新数据】

认为线程安全问题不一定会发生。只有在数据更新时,才会判断有没有其他线程对数据进行了修改。如果没有修改,则认为是安全的,自己才更新数据;如果数据已经被其他线程修改,则说明发生了安全性问题,此时可以重试或者是抛出异常。【对于要修改的数据,有一个版本号,如果查询出来的版本和where 筛选的版本号码一致,则可以进行修改,实际上是不加锁的。】

用数据本身有没有变化,来作为是否修改的条件。版本号,用数据本身来代替,简化了操作。CAS方案

弊端:虽然没有发生了超卖,但是优惠券抢购,只发生了21次,但是一共是100的库存量呀。

原因:多线程条件下,库存量快速变化,导致的其它线程,发生了扣减失败的情况,但是不出错。【成功率太低,没有业务上的安全问题】

where id = ? and stock = ? ==========》 where id = ? and stock > 0

防止请求对数据库的压力

一人一单,规避黄牛【新增数据】

做一个查询,如果表中存在,就不允许下单了。

出现的问题:并发环境下,库存竟然少了10,订单量一个人竟然有10单。虽然做了一人一单的判断,但是多线程环境下不管用。

还是那个并发安全问题,只能使用悲观锁方案。

从查询订单,到判断,到新增,做一个封装。

应该是先提交事务之后,再进行锁的释放。

如果先进行锁的释放,事务如果没有提交的话,下一个线程来查询时,还是出问题。

this.createVouther();this拿到的是service的对象,而spring的事务要想生效,是对当前service类对象做了代理,用代理对象做了事务处理。

所以使用类对象,可能会使得事务不生效。

1、添加依赖aspectjweaver

2、@EnableAspectJAutoProxy(exposeProxy = true);

通过加锁,可以解决在单体项目下的问题,那么如何解决多实例下的并发安全问题呢?

多体项目并发安全性问题



选择好左下角的服务,ctrl+D之后,修改端口-Dserver.port=8082

Redis的分布式锁实现多实例并发安全

分布式锁:满足分布式系统或集群模式下多线程可见,并且互斥的锁。

多进程可见:【独立于JVM】

互斥:【只有一个人能获取到】

高可用:【不能获取锁的动作经常出问题】

高性能:【加锁本身呢,会影响业务的性能,串行执行会变慢】

安全性:【锁获取了,异常挂了怎么办,产生死锁怎么处理呢】

Redis实现最简单的分布式锁

利用redis作为第三方中间件,给分布式项目的服务加锁。

public interface ILock 

    /**
     * 尝试获取锁
     * @param timeoutSec 锁持有的超时时间,过期后自动释放
     * @return true代表获取锁成功; false代表获取锁失败
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();

package com.pshdhx.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock 

    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) 
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    

    @Override
    public boolean tryLock(long timeoutSec) 
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    

    //删除自己线程的锁,不能因为本线程阻塞处理完成后(自己的锁过期了),删除别的线程的锁。
    @Override
    public void unlock() 
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁中的标示
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判断标示是否一致
        if(threadId.equals(id)) 
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        
    

直接使用线程的ID作为锁的值,是不合适的,线程ID值是递增的,因为多个JVM的线程号可能会相同。

Redis的分布式锁的原子性问题

如果在释放锁的过程中,发生了FullGC,然后释放锁的过程被阻塞,该锁超时自动释放了。则其余线程能够正常获取锁,此时阻塞的线程恢复了,把其余线程获取的锁给释放了,所以要保证释放锁的原子性。

Redis的事务可以保证其原子性,但是无法保证其一致性。而且事务里边的多个操作,是个批处理,是最终一次性执行。

所以使用Lua脚本来执行。

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法可以参考

www.runoob.com/lua/lua-tutorial.html

在Shell中执行:

EVAL “return redis.call(‘set’,‘name’,‘jack’)” 0个参数

EVAL “return redis.call(‘set’,KEYS[1],ARGV[1])” 1 name Rose

unlock.lua 放入到resources里边

-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) ==  ARGV[1]) then
    -- 释放锁 del key
    return redis.call('del', KEYS[1])
end
return 0

改进Redis的分布式锁

需求:基于Lua脚本实现分布式锁的释放锁逻辑

提示:RedisTemplate调用Lua脚本的api如下:

public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) 
    return this.scriptExecutor.execute(script, keys, args);

package com.pshdhx.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock 

    private String name;
    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) 
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    

    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static 
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    

    @Override
    public boolean tryLock(long timeoutSec) 
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    

    @Override
    public void unlock() 
        // 调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    
    /*@Override
    public void unlock() 
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁中的标示
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判断标示是否一致
        if(threadId.equals(id)) 
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        
    */

核心代码

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static 
    UNLOCK_SCRIPT = new DefaultRedisScript<>();
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    UNLOCK_SCRIPT.setResultType(Long.class);

 @Override
    public void unlock() 
        // 调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    

不可重入锁

方法A获取了锁,然后去调用方法B,此时,方法B也想要获取锁,但是无法获取了。

不可重入:同一个线程无法多次获取同一把锁。

不可重试:尝试锁只尝试获取一次就返回false,没有重试机制。

超时释放:锁超时释放虽然可以避免死锁,但是如果业务执行耗时长,也会导致锁的释放,存在一定的安全隐患。

主从一致性【读写分离】:如果Redis提供了主从集群,主从同步存在延迟。在主节点set操作获取了锁,尚未同步到从节点,突然主节点宕机,选择新的从节点作为主,但是从节点没有锁,所以新的线程会重新set锁。

Redisson

提供了一系列分布式的常用对象,还提供了许多分布式服务,其中就包括了各种分布式锁的实现。

1、可重入锁

2、公平锁

3、联锁

4、红锁

5、读写锁

6、信号量

7、可过期性信号量

8、闭锁

可重入锁原理:

使用了哈希值的方式,进行可重入锁的是设计。但凡是在一个线程之中,无论是里边有多少个业务方法要获取锁,只管将对应的value值加一即可;如果需要释放锁,则将value值减一即可。如果减到了0,则进行锁的删除操作。

value值不断的增加,只要是同一个线程想要获取锁,value值就+1,【记得重置下有效期】

释放锁操作,就减1,如果减为0,则可以删除这把锁了。

local key = KEYS[1]; --锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; --锁的自动释放时间
--判断当前锁是否还是被自己持有
if(redis.call('HEXISTS',key,threadId) == 0) then
	return nil; --如果不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY',key,threadId,-1);
if(count > 0) then 
    	redis.call('Expire',key,releaseTime);
    	return nil;
else
    	redis.call('del',key);
    	return nil;
end;

1、重试机制

2、超时释放,此时业务未完成;

3、主从一致性问题;

waitTime:获取锁的最大等待时长。第一次获取锁失败后,不会立即返回,而是在最大等待时间内不断的尝试获取锁。如果在最大等待时间内,还没有获取锁,则返回false。

leaseTime:存活时间

TimeUnit:时间单位

分布式锁默认的超时释放时间-看门狗

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) 
    if (leaseTime != -1L) 
        return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
     else 
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> 
            if (e == null) 
                if (ttlRemaining == null) 
                    this.scheduleExpirationRenewal(threadId);
                

            
        );
        return ttlRemainingFuture;
    

this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),

有个看门狗的超时时间,30 000L ====30秒

RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

他这个获取锁失败以后,会尝试再次获取。但是,也不是马上尝试获取的,因为别的业务应该还在执行,这样只能加大cpu的负担。此时会进行订阅操作,订阅的是释放锁的信号。

protected RFuture<Boolean> unlockInnerAsync(long threadId) 
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));

这是释放锁的代码,此时释放锁,会进行一个发布的命令。

尝试获取锁的线程,此时进行了订阅。

if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) 
    if (!subscribeFuture.cancel(false)) 
        subscribeFuture.onComplete((res, e) -> 
            if (e == null) 
                this.unsubscribe(subscribeFuture, threadId);
            

        );
    

    this.acquireFailed(waitTime, unit, threadId);
    return false;
 

在等待订阅的过程中,它也不是无限制等待的,最大等待时间就是这个time【最大剩余等待时间】,如果此时间内还未返回释放锁的通知,超时了,就取消订阅,则返回false。

if (ttl >= 0L && ttl < time) 
    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
 else 
    ((RedissonLockEntry)subscribeFuture以上是关于利用Redis一步步实现优惠券的最终秒杀方案的主要内容,如果未能解决你的问题,请参考以下文章

Redis的优惠券秒杀问题之全局唯一ID秒杀下单超卖问题一人一单问题以及集群下的问题

Redis之秒杀下单优化以及认识redis消息队列

Redis之秒杀下单优化以及认识redis消息队列

优惠卷秒杀系统设计秒杀优化 —— 基于阻塞队实现异步秒杀优化 及 基于Lua脚本判断秒杀库存一人一单

使用Redis解决秒杀业务问题分析与解决方案

黑马点评项目总结