分布式锁(rediszk)

Posted xue_yun_xiang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁(rediszk)相关的知识,希望对你有一定的参考价值。

一、概述

什么是锁?

synchronized ,lock 都是锁

锁的作用

保证 当前线程访问该方法或者代码块 时,方法内、代码块中的 资源(共享变量)同一时刻只有一个线程在访问,线程安全

synchronized 、lock的特性

1.保证synchronized ,lock 代码 都是 同一时刻单线程运行
2.同时能够保证 代码块内 的 共享变量具有 可见性,保证其他线程能够立即看到 代码块内的共享变量的值

二、volatile

内存语义两个

可见性: 当一个线程修改了某一个变量值,这个值会被其他线程立即可见
禁止指令重排: 禁止jvm 优化代码的执行顺序

int a = 1;
int b = 1;
inc = a +b;
jvm 先运行 int a = 1; 或者 int b = 1; 对于代码执行结果没有影响,但是jvm在给你保证单线程符合你的代码逻辑情况下,进行优化 有可能 先执行 int b = 1; 在执行 int a = 1;
https://www.jianshu.com/p/a9ecac4bd753
如果是单例模式
public class DoubleCheckedLocking //1
private static Instance instance; //2
public static Instance getInstance() //3
if(instance ==null) //4:第一次检查
synchronized (DoubleCheckedLocking.class) //5:加锁
if (instance == null) //6:第二次检查
instance = new Instance(); //7:问题的根源处在这里
//8
//9
return instance; //10
//11

instance = new Instance(); 这一行代码 在jvm 底层 有多行执行
memory=allocate(); //1:分配对象的内存空间
ctorInstance(memory); //2:初始化对象
instance = memory; //3:设置instance指向刚分配的内存地址
jvm 优化以后,有没有有可能
memory=allocate(); //1:分配对象的内存空间
instance = memory; //3:设置instance指向刚分配的内存地址
此时 没有初始化对象 ,如果被其他线程看到 ,会造成获取一个没有初始化的对象,造成异常
如何解决 ? 将 instance 使用 volatile 修饰,禁止 执行重排 禁止创建对象的三步优化,

三、实现

1、创建工程,引入依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.4.RELEASE</version>
</parent>


<dependencies>

    <!--springBoot 相关  -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

2、启动类

3、商品秒杀

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 商品 秒杀相关
 */

@RestController
public class GoodsController 

    @Autowired
    private StringRedisTemplate redisTemplate;


    /**
     * 初始化商品订单 库存
     * @param goods
     * @return
     */
    @RequestMapping("/init")
    public String init(String goods)

        // 初始化 stack 为 1000  订单 0

        redisTemplate.opsForValue().set("stack_"+goods, 1000+"");
        redisTemplate.opsForValue().set("order_"+goods, 0+"");



        return "当前商品 " +goods +" 库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+"--订单:"
                + redisTemplate.opsForValue().get("order_"+goods);
    

    // synchronized 加锁 保证 当前应用 单线程 执行 kill ,保证线程安全
    // synchronized 只能保证单个应用的线程安全,如果 对应的代码块的资源被多个引用,
    // 则synchronized 不能起到线程安全的作用,此时需要一个全局的,夸应用的锁--》分布式锁(zk(临时有序节点) ,redis(setnx))
    @RequestMapping("/kill")
    public synchronized String kill(String goods)

        // 1.获取当前的库存  并判断 数量
        Integer currentStack = Integer.valueOf( redisTemplate.opsForValue().get("stack_"+goods));

        if (currentStack <=0 ) // 没有库存

            return  "抱歉,已售罄" + goods;
        


        // 2.削减库存
        redisTemplate.opsForValue().set("stack_"+goods, (currentStack -1)+"");

        try 
            Thread.sleep(10);
         catch (InterruptedException e) 
            e.printStackTrace();
        

        // 3.订单增加 1
        Integer order = Integer.valueOf( redisTemplate.opsForValue().get("order_"+goods));
        redisTemplate.opsForValue().set("order_"+goods, (order +1 )+"");


        return "已抢到到该商品" + goods;
    
    /**
     * 获取 当前商品 对应的 库存 和 订单 数量
     * @param goods
     * @return
     */
    @RequestMapping("/getState")
    public String getState(String goods)

        return "当前商品 " +goods +" 库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+"--订单:"
                + redisTemplate.opsForValue().get("order_"+goods);
    

4、始化商品库存

http://localhost:8888/init?goods=mate50

5、使用ab 工具 启动 两个应用 8888 8889 分别发起秒杀测试

ab -n 1000 -c 2 http://localhost:8888/kill?goods=mate50
ab -n 请求数 -c 并发数 访问的路径

6、带秒杀完毕出现结果不一致,数据 不安全


四、使用 redis 实现分布式锁

1、创建RedisLock 实现类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class RedisLock 

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    /**
     * 获取锁的方法
     */
    public boolean lock(String key,String value,int time)

       // 如果能够设置key 成功,key 不存在  -----》意义就是当前请求获取了 锁
       //       设置 key  失败,key 存在  -------》意义 已经有其他请求持有 锁
       return stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS );
    


    /**
     * 删除 key  代表 释放锁
     * @param key
     * @return
     */
    public boolean unlock(String key)

        return stringRedisTemplate.delete(key);
    

2、使用

 @RequestMapping("/redisKill")
    public  String redisKill(String goods)

        if (redisLock.lock(goods, "1",  3)) // 成功 获取锁

            // 1.获取当前的库存  并判断 数量
            Integer currentStack = Integer.valueOf( redisTemplate.opsForValue().get("stack_"+goods));

            if (currentStack <=0 ) // 没有库存

                return  "抱歉,已售罄" + goods;
            


            // 2.削减库存
            redisTemplate.opsForValue().set("stack_"+goods, (currentStack -1)+"");

            try 
                Thread.sleep(10);
             catch (InterruptedException e) 
                e.printStackTrace();
            

            // 3.订单增加 1
            Integer order = Integer.valueOf( redisTemplate.opsForValue().get("order_"+goods));
            redisTemplate.opsForValue().set("order_"+goods, (order +1 )+"");

            //释放锁
            redisLock.unlock(goods);

            return "已抢到到该商品" + goods;
        else 

            return "抱歉抢购该商品" + goods +"失败";
        


    

五、使用zk 实现分布式锁

但是 ,使用zk 本身的监控功能去实现分布式锁,非常繁琐,所以使用 第三方封装的锁
InterProcessMutex 提供互斥锁 来自 curator 框架

1、引入 curator 依赖

<!--  zk 依赖-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
        </dependency>

        <!--zk  封装  -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>

2、将CuratorFramework 加入到容器

@Configuration
public class ZkConfig 



    /**
     * CuratorFramework 封装了 zkClient
     * @return
     */
    @Bean // 让
    public  CuratorFramework createCuratorFramework()

        // 客户端重试策略 如果连不上zk server 会发起重试
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);

        CuratorFramework curatorFramework =
                CuratorFrameworkFactory.newClient("192.168.12.130:2181,192.168.12.130:2182,192.168.12.130:2183", retryPolicy);

        // 发起连接
        curatorFramework.start();

        return  curatorFramework;
    


3、使用zk 作为分布式锁

/**
 *  使用zk  完成分布式锁
 *
 *
 * @param goods
 * @return
 * @throws Exception
 */
@RequestMapping("/zkKill")
public  String zkKill(String goods) throws Exception 

    // interProcessMutex 就是创建对 zk 分布式锁的封装
    InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework,"/"+goods);
    // interProcessMutex.acquire 获取锁 ,并且最多持有3 s
    //  true 代表 持有锁
    //  false  代表持有锁失败
    if (interProcessMutex.acquire(3, TimeUnit.SECONDS)) // 成功 获取锁

        // 1.获取当前的库存  并判断 数量
        Integer currentStack = Integer.valueOf( redisTemplate.opsForValue().get("stack_"+goods));

        if (currentStack <=0 ) // 没有库存

            return  "抱歉,已售罄" + goods;
        


        // 2.削减库存
        redisTemplate.opsForValue().set("stack_"+goods, (currentStack -1)+"");

        try 
            Thread.sleep(10);
         catch (InterruptedException e) 
            e.printStackTrace();
        

        // 3.订单增加 1
        Integer order = Integer.valueOf( redisTemplate.opsForValue().get("order_"+goods));
        redisTemplate.opsForValue().set("order_"+goods, (order +1 )+"");

        //释放锁  释放zk 分布式锁 其实就是删除自己创建的临时节点
        interProcessMutex.release();

        return "已抢到到该商品" + goods;
    else 

        return "抱歉抢购该商品" + goods +"失败";
    



4、zk vs redis 分布式锁

实际工作首选redis,因为 redis 基于内存 并发量很高,不易成为性能瓶颈,zk 的存储基于文件(磁盘),并发写的能力很差,另外zk 创建节点是 不仅master要写入,slave 也要 写入,从而更加耗时,所以zk 只适合并发量很低的场合。

以上是关于分布式锁(rediszk)的主要内容,如果未能解决你的问题,请参考以下文章

分布式锁(rediszk)

分布式锁(rediszk)

分布式锁(数据库RedisZK)拍了拍你

带你学会线程锁,进程锁和分布式锁

SpringBoot--防止重复提交(锁机制---本地锁分布式锁)

Redis实现分布式锁与Zookeeper实现分布式锁区别