Redis实现分布式锁---业务真实使用

Posted 小王子jvm

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis实现分布式锁---业务真实使用相关的知识,希望对你有一定的参考价值。

分布式场景问题

商品超卖问题

这个问题是做项目时候学习到的,例如下单流程:同一个节点中的并发访问安全可以使用synchnized来保证线程的安全,但是不同节点在同一个业务这是无法保证安全的。

怎么解决分布式并发问题

使用redis实现分布式的锁

将上面的步骤进行一个拆分:

总的来说就是在做这些操作之前一定需要对每一个商品进行上锁,这样其他节点就无法操作。

使用Redis实现

这里使用项目中的真实案例来实现这个过程。一个下单流程代码如下:

/**
* 1.根据购物车选中的cid来查询购物车记录信息
* 2.校验库存
* 3.生成订单信息
* 4.生成快照信息
* 5.扣减库存
* 6.删除购物车中记录(已生成订单的就可以直接删除)
*/
public Map<String,String> addOrder(String cids, Orders order) 
    Map<String,String> map = new HashMap<>();

    //1.根据购物车选中的cid来查询商品信息
    String[] split = cids.split(",");
    List<Integer> list = new ArrayList<>();
    for (String s : split) 
        list.add(Integer.parseInt(s));
    
    List<ShoppingCartVO> shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);

    /**
     * 将所有的skuid写入redis ,有一个不满足就不能进行上锁
     */
    boolean isLock = true;
    String skuIds[] = new String[shoppingCartVOS.size()];
    for (int i = 0; i < shoppingCartVOS.size(); i++) 
        String skuId = shoppingCartVOS.get(i).getSkuId();
        //进行上锁
        Boolean ifAbsent = redisTemplate.boundValueOps(skuId).setIfAbsent("sheep");
        if (ifAbsent)
            skuIds[i] = skuId;
        
        isLock = isLock && ifAbsent;
    
    //如果每个商品都没有被其他人操作,既可以进行操作,否则就要释放每一个上过的Lock
    if(isLock)
        try 
            //2.校验库存 --> shoppingCartVOS 中的cartNum < stock即可
            //这里需要再次查询是第一次查询是没有加锁,如果查完被别人先加锁修改了,那么这里的库存就亏不对,所以需要再次查询
            shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);
            boolean stockFlag = true;
            for (ShoppingCartVO cartVO : shoppingCartVOS) 
                if(Integer.parseInt(cartVO.getCartNum()) > cartVO.getSkuStock())
                    stockFlag = false;
                    break;
                 
            
			//其他业务细节这里屏蔽,只体现上锁和解锁过程
            if(stockFlag)
                //3. 保存订单:
                //4.生成快照信息OrderItem
                //扣减库存
                //删除购物车记录
                map.put("orderId",orderId);
                map.put("productNames",untitled);
                //之后返回信息
                return map;
            
        catch (Exception e)
            e.printStackTrace();
        finally 
            //释放锁
            for (int i = 0; i < skuIds.length; i++) 
                String skuId = skuIds[i];
                if (skuId != null && !"".equals(skuId))
                    redisTemplate.delete(skuId);
                
            
        
     else 
        //释放锁
        for (int i = 0; i < skuIds.length; i++) 
            String skuId = skuIds[i];
            if (skuId != null && !"".equals(skuId))
                redisTemplate.delete(skuId);
            
        
    

    return null;

这里依然还有几个问题:

  • 如果订单中部分商品加锁成功,但是某一个加锁失败就需要全部释放
  • 如果加锁之后,出现了异常导致这个线程销毁,锁谁来释放?

解决锁无法释放问题

这里根据上面的描述不难理解其实可以通过Redis中设置超时时间来解决,这样到点必定会释放锁资源。

但是这样又出现了另一个问题,假如有线程 t1 和 t2 ,t1加锁设置超时时间,然后由于一些特殊原因运行比较慢,没有处理完业务锁过期了,此时 t2 进行加锁,但是 t1 处理完后会释放锁,这样会把 t2 的给释放掉,那么其他线程又可以进行操作造成并发问题。如图:

解决t1释放t2锁问题

根据上面问题,其实是可以联想到,释放锁的时候去检查这个锁是不是我自己锁住的。上面的代码中不难发现,加锁的时候value是固定的,可以从这里入手,使用一个map集合将这个加锁的商品保存起来,key为加锁的商品id,value是随机生成的字符串(尽可能保证唯一即可)。释放的时候就可以从redis获取,如果有值并且值就是我自己生成的那个字符串才进行释放。

代码根据上面的业务代码进行改造,可以这样设计:

//加锁改造如下:
boolean isLock = true;
String skuIds[] = new String[shoppingCartVOS.size()];
Map<String,String> values = new HashMap<>();
for (int i = 0; i < shoppingCartVOS.size(); i++) 
    String skuId = shoppingCartVOS.get(i).getSkuId();
    
    //这是不再固定
    String uuid = UUID.randomUUID().toString();
    //进行上锁
    Boolean ifAbsent = redisTemplate.boundValueOps(skuId).setIfAbsent(uuid,10, TimeUnit.SECONDS);
    if (ifAbsent)
        skuIds[i] = skuId;
        values.put(skuId,uuid);
    
    isLock = isLock && ifAbsent;

//如果每个商品都没有被其他人操作,既可以进行操作,否则就要释放每一个上过的Lock
if(isLock)

//释放锁改造
for (int i = 0; i < skuIds.length; i++) 
    String skuId = skuIds[i];
    if (skuId != null && !"".equals(skuId))
        
        //从redis中获取值比较与加锁时候是否一直
        String value = redisTemplate.boundValueOps(skuId).get();
        if (value != null && map.get(skuId).equals(value))
            redisTemplate.delete(skuId);
        
    

这里,又有一个问题,这一切看起来好像都完美,但是,释放锁这里依然会有并发问题,比如:

t1 在释放锁的时候if判断完成,正要执行删除锁的代码,结果锁过期了,然后别人拿到这个锁,结果 t1 还是可以释放掉别人的锁,这就是一个问题,这里就需要保证判断和释放应该同时操作,也就是这两个操作要具有原子性

使用lua脚本解决

首先在resource目录下创建脚本文件:unlock.lua,这个lua脚本的意思就是从redis中查询传过来的keys[1]内容的并且跟传过来的ARGV[1]是否相等,如下:

if redis.call("get",KEYS[1]) == ARGV[1] then
 return redis.call("del",KEYS[1])
else
 return 0
end

配置Bean加载lua脚本:

@Bean
public DefaultRedisScript<List> defaultRedisScript()
    DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(List.class);
    defaultRedisScript.setScriptSource(new ResourceScriptSource(new                                                         ClassPathResource("unlock.lua")));
    return defaultRedisScript; 

通过执⾏lua脚本解锁:

//执⾏lua脚本
List<String> keys = new ArrayList<>();
keys.add(skuId);
List rs = stringRedisTemplate.execute(defaultRedisScript,keys , values.get(skuId));
System.out.println(rs.get(0));

业务代码释放锁测操作可以改为:(keys中包含skuId这个脚本会查询这个的value,后面的values.get(skuId)就是需要比较的值)

//释放锁
for (int i = 0; i < skuIds.length; i++) 
    String skuId = skuIds[i];
    if (skuId != null && !"".equals(skuId))
        List<String> keys = new ArrayList<>();
        keys.add(skuId);
        List rs = redisTemplate.execute(defaultRedisScript,keys , values.get(skuId));
        System.out.println(rs.get(0));
    

分布式锁框架-Redisson

看门狗机制

线程:⽤于给当前key延⻓过期时间,保证业务线程正常执⾏的过程中,锁不会过期。

说白了就是一个守护线程。

Redission介绍

Redisson在基于NIO的Netty框架上,充分的利⽤了Redis键值数据库提供的⼀系列优势,在Java实⽤⼯具包中常⽤接⼝的基础上,为使⽤者提供了⼀系列具有分布式特性的常⽤⼯具类。使得原本作为协调单机多线程并发程序的⼯具包获得了协调分布式多机多线程并发系统的能⼒,⼤⼤降低了设计和研发⼤规模分布式系统的难度。同时结合各富特⾊的分布式服务,更进⼀步简化了分布式环境中程序相互之间的协作。

使用SpringBoot集成

首先引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>

然后配置如下:

redisson:
   addr:
     singleAddr:
       host: redis://47.96.11.185:6370
       password: 12345678
       database: 0

配置RedissionClient:

@Configuration
public class RedissonConfig 
    @Value("$redisson.addr.singleAddr.host")
    private String host;
    @Value("$redisson.addr.singleAddr.password")
    private String password;
    @Value("$redisson.addr.singleAddr.database")
    private int database;
    @Bean
    public RedissonClient redissonClient()
        Config config = new Config(); 
        config.useSingleServer().
               setAddress(host).
               setPassword(password).
               setDatabase(database);
        return Redisson.create(config);
    

其他使用配置

单机连接配置参考上面即可

集群连接配置如下:

redisson:
   addr:
     cluster:
       host: redis://47.96.11.185:6370, ... ,redis://47.96.11.185:6373
       password: 12345678

客户端配置:

@Bean
public RedissonClient redissonClient()
    Config config = new Config();
    config.useClusterServers()
        .addNodeAddress(hosts.split("[,]"))
        .setPassword(password)
        .setScanInterval(2000)
        .setMasterConnectionPoolSize(10000)
        .setSlaveConnectionPoolSize(10000);
    return Redisson.create(config);


主从配置如下:

redisson:
	addr:
		masterAndSlave:
            masterhost: redis://47.96.11.185:6370
            slavehosts: redis://47.96.11.185:6371,redis://47.96.11.185:6372
            password: 12345678
            database: 0

客户端配置:

@Configuration
public class RedissonConfig3 
    @Value("$redisson.addr.masterAndSlave.masterhost")
    private String masterhost;
    @Value("$redisson.addr.masterAndSlave.slavehosts")
    private String slavehosts;
    @Value("$redisson.addr.masterAndSlave.password")
    private String password;
    @Value("$redisson.addr.masterAndSlave.database")
    private int database;
 
    @Bean
    public RedissonClient redissonClient()
        Config config = new Config();
        config.useMasterSlaveServers()
            .setMasterAddress(masterhost)
            .addSlaveAddress(slavehosts.split("[,]"))
            .setPassword(password)
            .setDatabase(database)
            .setMasterConnectionPoolSize(10000)
            .setSlaveConnectionPoolSize(10000);
        return Redisson.create(config);
    

至此几种Redis的模式都涉及到了

Redission使用‘

  1. 获取公平锁和非公平锁
//获取公平锁
RLock lock = redissonClient.getFairLock(skuId);
//获取⾮公平锁
RLock lock = redissonClient.getLock(skuId);
  1. 加锁——阻塞和非阻塞锁
//阻塞锁(如果加锁成功之后,超时时间为30s;加锁成功开启看⻔狗,剩5s延⻓过期时间)
lock.lock();
//阻塞锁(如果加锁成功之后,设置⾃定义20s的超时时间)
lock.lock(20,TimeUnit.SECONDS);
//⾮阻塞锁(设置等待时间为3s;如果加锁成功默认超时间为30s)
boolean b = lock.tryLock(3,TimeUnit.SECONDS);
//⾮阻塞锁(设置等待时间为3s;如果加锁成功设置⾃定义超时间为20s)
boolean b = lock.tryLock(3,20,TimeUnit.SECONDS);

//释放锁
lock.unlock();
  1. 公平非阻塞锁
//公平⾮阻塞锁
RLock lock = redissonClient.getFairLock(skuId);
boolean b = lock.tryLock(3,20,TimeUnit.SECONDS);

至此几种基本的锁使用都涉及到了

上面业务代码的优化

首先先抽象出这个业务结构:

//伪代码如下:
HashMap map = null;
加锁
try
    if(isLock)
        校验库存
        if(库存充⾜)
              保存订单
              保存快照
              修改库存
              删除购物⻋
              map = new HashMap();
              ...
        
    
catch(Exception e)
    e.printStackTrace();
finally
    释放锁

return map;

在上面的业务代码中就可以优化成这个写法

/**
* 1.根据购物车选中的cid来查询购物车记录信息
* 2.校验库存
* 3.生成订单信息
* 4.生成快照信息
* 5.扣减库存
* 6.删除购物车中记录(已生成订单的就可以直接删除)
*/
public Map<String,String> addOrder(String cids, Orders order) 
    Map<String,String> map = null;

    //1.根据购物车选中的cid来查询商品信息
    String[] split = cids.split(",");
    List<Integer> list = new ArrayList<>();
    for (String s : split) 
        list.add(Integer.parseInt(s));
    
    List<ShoppingCartVO> shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);

    /**
    * 将所有的skuid写入redis ,有一个不满足就不能进行上锁
    */
    boolean isLock = true;
    String skuIds[] = new String[shoppingCartVOS.size()];
    Map<String,RLock> locks = new HashMap<>();
    for (int i = 0; i < shoppi

以上是关于Redis实现分布式锁---业务真实使用的主要内容,如果未能解决你的问题,请参考以下文章

Redis实现分布式锁---业务真实使用

Redis分布式锁Redisson原理

使用Redis实现分布式锁

分布式锁方案论证与实现

Redis实现分布式锁

Redis学习Redis分布式锁实现秒杀业务(乐观锁悲观锁)