分布式锁(rediszk)
Posted xue_yun_xiang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁(rediszk)相关的知识,希望对你有一定的参考价值。
一、概述
什么是锁?
synchronized ,lock 都是锁
锁的作用
保证 当前线程访问该方法或者代码块 时,方法内、代码块中的 资源(共享变量)同一时刻只有一个线程在访问,线程安全
synchronized 、lock的特性
1.保证synchronized ,lock 代码 都是 同一时刻单线程运行
2.同时能够保证 代码块内 的 共享变量具有 可见性,保证其他线程能够立即看到 代码块内的共享变量的值
二、volatile
内存语义两个
可见性: 当一个线程修改了某一个变量值,这个值会被其他线程立即可见
禁止指令重排: 禁止jvm 优化代码的执行顺序
int a = 1;
int b = 1;
inc = a +b;
jvm 先运行 int a = 1; 或者 int b = 1; 对于代码执行结果没有影响,但是jvm在给你保证单线程符合你的代码逻辑情况下,进行优化 有可能 先执行 int b = 1; 在执行 int a = 1;
https://www.jianshu.com/p/a9ecac4bd753
如果是单例模式
public class DoubleCheckedLocking { //1
private static Instance instance; //2
public static Instance getInstance(){ //3
if(instance ==null) { //4:第一次检查
synchronized (DoubleCheckedLocking.class) { //5:加锁
if (instance == null) //6:第二次检查
instance = new Instance(); //7:问题的根源处在这里
} //8
} //9
return instance; //10
} //11
}
instance = new Instance(); 这一行代码 在jvm 底层 有多行执行
memory=allocate(); //1:分配对象的内存空间
ctorInstance(memory); //2:初始化对象
instance = memory; //3:设置instance指向刚分配的内存地址
jvm 优化以后,有没有有可能
memory=allocate(); //1:分配对象的内存空间
instance = memory; //3:设置instance指向刚分配的内存地址
此时 没有初始化对象 ,如果被其他线程看到 ,会造成获取一个没有初始化的对象,造成异常
如何解决 ? 将 instance 使用 volatile 修饰,禁止 执行重排 禁止创建对象的三步优化,
三、实现
1、创建工程,引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
</parent>
<dependencies>
<!--springBoot 相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
2、启动类
3、商品秒杀
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 商品 秒杀相关
*/
@RestController
public class GoodsController {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 初始化商品订单 库存
* @param goods
* @return
*/
@RequestMapping("/init")
public String init(String goods){
// 初始化 stack 为 1000 订单 0
redisTemplate.opsForValue().set("stack_"+goods, 1000+"");
redisTemplate.opsForValue().set("order_"+goods, 0+"");
return "当前商品 " +goods +" 库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+"--订单:"
+ redisTemplate.opsForValue().get("order_"+goods);
}
// synchronized 加锁 保证 当前应用 单线程 执行 kill ,保证线程安全
// synchronized 只能保证单个应用的线程安全,如果 对应的代码块的资源被多个引用,
// 则synchronized 不能起到线程安全的作用,此时需要一个全局的,夸应用的锁--》分布式锁(zk(临时有序节点) ,redis(setnx))
@RequestMapping("/kill")
public synchronized String kill(String goods){
// 1.获取当前的库存 并判断 数量
Integer currentStack = Integer.valueOf( redisTemplate.opsForValue().get("stack_"+goods));
if (currentStack <=0 ){ // 没有库存
return "抱歉,已售罄" + goods;
}
// 2.削减库存
redisTemplate.opsForValue().set("stack_"+goods, (currentStack -1)+"");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3.订单增加 1
Integer order = Integer.valueOf( redisTemplate.opsForValue().get("order_"+goods));
redisTemplate.opsForValue().set("order_"+goods, (order +1 )+"");
return "已抢到到该商品" + goods;
}
/**
* 获取 当前商品 对应的 库存 和 订单 数量
* @param goods
* @return
*/
@RequestMapping("/getState")
public String getState(String goods){
return "当前商品 " +goods +" 库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+"--订单:"
+ redisTemplate.opsForValue().get("order_"+goods);
}
}
4、始化商品库存
http://localhost:8888/init?goods=mate50
5、使用ab 工具 启动 两个应用 8888 8889 分别发起秒杀测试
ab -n 1000 -c 2 http://localhost:8888/kill?goods=mate50
ab -n 请求数 -c 并发数 访问的路径
6、带秒杀完毕出现结果不一致,数据 不安全
四、使用 redis 实现分布式锁
1、创建RedisLock 实现类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class RedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 获取锁的方法
*/
public boolean lock(String key,String value,int time){
// 如果能够设置key 成功,key 不存在 -----》意义就是当前请求获取了 锁
// 设置 key 失败,key 存在 -------》意义 已经有其他请求持有 锁
return stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS );
}
/**
* 删除 key 代表 释放锁
* @param key
* @return
*/
public boolean unlock(String key){
return stringRedisTemplate.delete(key);
}
}
2、使用
@RequestMapping("/redisKill")
public String redisKill(String goods){
if (redisLock.lock(goods, "1", 3)){ // 成功 获取锁
// 1.获取当前的库存 并判断 数量
Integer currentStack = Integer.valueOf( redisTemplate.opsForValue().get("stack_"+goods));
if (currentStack <=0 ){ // 没有库存
return "抱歉,已售罄" + goods;
}
// 2.削减库存
redisTemplate.opsForValue().set("stack_"+goods, (currentStack -1)+"");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3.订单增加 1
Integer order = Integer.valueOf( redisTemplate.opsForValue().get("order_"+goods));
redisTemplate.opsForValue().set("order_"+goods, (order +1 )+"");
//释放锁
redisLock.unlock(goods);
return "已抢到到该商品" + goods;
}else {
return "抱歉抢购该商品" + goods +"失败";
}
}
五、使用zk 实现分布式锁
但是 ,使用zk 本身的监控功能去实现分布式锁,非常繁琐,所以使用 第三方封装的锁
InterProcessMutex 提供互斥锁 来自 curator 框架
1、引入 curator 依赖
<!-- zk 依赖-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<!--zk 封装 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
2、将CuratorFramework 加入到容器
@Configuration
public class ZkConfig {
/**
* CuratorFramework 封装了 zkClient
* @return
*/
@Bean // 让
public CuratorFramework createCuratorFramework(){
// 客户端重试策略 如果连不上zk server 会发起重试
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
CuratorFramework curatorFramework =
CuratorFrameworkFactory.newClient("192.168.12.130:2181,192.168.12.130:2182,192.168.12.130:2183", retryPolicy);
// 发起连接
curatorFramework.start();
return curatorFramework;
}
}
3、使用zk 作为分布式锁
/**
* 使用zk 完成分布式锁
*
*
* @param goods
* @return
* @throws Exception
*/
@RequestMapping("/zkKill")
public String zkKill(String goods) throws Exception {
// interProcessMutex 就是创建对 zk 分布式锁的封装
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework,"/"+goods);
// interProcessMutex.acquire 获取锁 ,并且最多持有3 s
// true 代表 持有锁
// false 代表持有锁失败
if (interProcessMutex.acquire(3, TimeUnit.SECONDS)){ // 成功 获取锁
// 1.获取当前的库存 并判断 数量
Integer currentStack = Integer.valueOf( redisTemplate.opsForValue().get("stack_"+goods));
if (currentStack <=0 ){ // 没有库存
return "抱歉,已售罄" + goods;
}
// 2.削减库存
redisTemplate.opsForValue().set("stack_"+goods, (currentStack -1)+"");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3.订单增加 1
Integer order = Integer.valueOf( redisTemplate.opsForValue().get("order_"+goods));
redisTemplate.opsForValue().set("order_"+goods, (order +1 )+"");
//释放锁 释放zk 分布式锁 其实就是删除自己创建的临时节点
interProcessMutex.release();
return "已抢到到该商品" + goods;
}else {
return "抱歉抢购该商品" + goods +"失败";
}
}
4、zk vs redis 分布式锁
实际工作首选redis,因为 redis 基于内存 并发量很高,不易成为性能瓶颈,zk 的存储基于文件(磁盘),并发写的能力很差,另外zk 创建节点是 不仅master要写入,slave 也要 写入,从而更加耗时,所以zk 只适合并发量很低的场合。
以上是关于分布式锁(rediszk)的主要内容,如果未能解决你的问题,请参考以下文章