基于Zookeeper实现分布式锁实践

Posted smileNicky

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Zookeeper实现分布式锁实践相关的知识,希望对你有一定的参考价值。

基于Zookeeper实现分布式锁实践

1、什么是Zookeeper?

Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是Hadoop和hbase的重要组件。

引用官网的图例:

特征:

  1. zookeeper的数据机构是一种节点树的数据结构,zNode是基本的单位,znode是一种和unix文件系统相似的节点,可以往这个节点存储或向这个节点获取数据
  2. 通过客户端可以对znode进行数据操作,还可以注册watcher监控znode的改变

2、Zookeeper节点类型

  • 持久节点(Persistent)
  • 持久顺序节点(Persistent_Sequential)
  • 临时节点(Ephemeral)
  • 临时顺序节点(Ephemeral_Sequential)

3、Zookeeper环境搭建

下载zookeeper,官网链接,https://zookeeper.apache.org/releases.html#download,去官网找到对应的软件下载到本地

修改配置文件,$ZOOKEEPER_HOME\\conf,找到zoo_sample.cfg文件,先备份一份,另外一份修改为zoo.cfg

解压后点击zkServer.cmd运行服务端:

4、Zookeeper基本使用

在cmd窗口或者直接在idea编辑器里的terminal输入命令:

zkCli.cmd -server 127.0.0.1:2181

输入命令help查看帮助信息:

ZooKeeper -server host:port -client-configuration properties-file cmd args
        addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
        addauth scheme auth
        close
        config [-c] [-w] [-s]
        connect host:port
        create [-s] [-e] [-c] [-t ttl] path [data] [acl]
        delete [-v version] path
        deleteall path [-b batch size]
        delquota [-n|-b|-N|-B] path
        get [-s] [-w] path
        getAcl [-s] path
        getAllChildrenNumber path
        getEphemerals path
        history
        listquota path
        ls [-s] [-w] [-R] path
        printwatches on|off
        quit
        reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
        redo cmdno
        removewatches path [-c|-d|-a] [-l]
        set [-s] [-v version] path data
        setAcl [-s] [-v version] [-R] path acl
        setquota -n|-b|-N|-B val path
        stat [-w] path
        sync path
        version
        whoami

create [-s] [-e] [-c] [-t ttl] path [data] [acl]-s表示顺序节点,-e表示临时节点,若不指定表示持久节点,acl是来进行权限控制的

[zk: 127.0.0.1:2181(CONNECTED) 1] create -s /zk-test 0
Created /zk-test0000000000

查看

[zk: 127.0.0.1:2181(CONNECTED) 4] ls /
[zk-test0000000000, zookeeper]

设置修改节点数据

set /zk-test 123

获取节点数据

get /zk-test

ps,zookeeper命令详情查看help帮助文档,也可以去官网看看文档

ok,然后java写个例子,进行watcher监听

package com.example.concurrent.zkSample;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

/**
 * <pre>
 *      Zookeeper 例子
 * </pre>
 *
 * <pre>
 * @author mazq
 * 修改记录
 *    修改后版本:     修改人:  修改日期: 2021/12/09 16:57  修改内容:
 * </pre>
 */
public class ZookeeperSample 

    public static void main(String[] args) 
        ZkClient client = new ZkClient("localhost:2181");
        client.setZkSerializer(new MyZkSerializer());
        client.subscribeDataChanges("/zk-test", new IZkDataListener() 
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception 
                System.out.println("监听到节点数据改变!");
            

            @Override
            public void handleDataDeleted(String dataPath) throws Exception 
                System.out.println("监听到节点数据被删除了");
            
        );

        try 
            Thread.sleep(1000 * 60 * 2);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    


5、Zookeeper应用场景

Zookeeper有什么典型的应用场景:

  1. 注册中心(Dubbo)
  2. 命名服务
  3. Master选举
  4. 集群管理
  5. 分布式队列
  6. 分布式锁

6、Zookeeper分布式锁

Zookeeper适合用来做分布式锁,然后具体实现是利用什么原理?我们知道zookeeper是类似于unix的文件系统,文件系统我们也知道在一个文件夹下面,会有文件名称不能一致的特性的,也就是互斥的特性。同样zookeeper也有这个特性,在同个znode节点下面,子节点命名不能重复。所以利用这个特性可以来实现分布式锁

业务场景:在高并发的情况下面进行订单场景,这是一个典型的电商场景


自定义的Zookeeper序列化类:

package com.example.concurrent.zkSample;


import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.UnsupportedEncodingException;

public class MyZkSerializer implements ZkSerializer 

    private String charset = "UTF-8";

    @Override
    public byte[] serialize(Object o) throws ZkMarshallingError 
        return String.valueOf(o).getBytes();
    

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError 
        try 
            return new String(bytes , charset);
         catch (UnsupportedEncodingException e) 
            throw new ZkMarshallingError();
        
    


订单编号生成器类,因为SimpleDateFormat是线程不安全的,所以还是要加上ThreadLocal

package com.example.concurrent.zkSample;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;

public class OrderCodeGenerator 

    private static final String DATE_FORMAT = "yyyyMMddHHmmss";
    private static AtomicInteger ai  = new AtomicInteger(0);
    private static int i = 0;

    private static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>() 
        @Override
        protected SimpleDateFormat initialValue() 
            return new SimpleDateFormat(DATE_FORMAT);
        
    ;

    public static DateFormat getDateFormat() 
        return (DateFormat) threadLocal.get();
    

    public static String generatorOrderCode() 
        try 
            return getDateFormat().format(new Date(System.currentTimeMillis()))
                    + i++;
         finally 
            threadLocal.remove();
        
    




pom.xml加上zookeeper客户端的配置:

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

实现一个zookeeper分布式锁,思路是获取节点,这个是多线程竞争的,能获取到锁,也就是创建节点成功,就执行业务,其它抢不到锁的线程,阻塞等待,注册watcher监听锁是否释放了,释放了,取消注册watcher,继续抢锁

package com.example.concurrent.zkSample;


import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
public class ZKDistributeLock implements Lock 

    private String localPath;
    private ZkClient zkClient;

    ZKDistributeLock(String localPath) 
        super();
        this.localPath = localPath;
        zkClient = new ZkClient("localhost:2181");
        zkClient.setZkSerializer(new MyZkSerializer());

    

    @Override
    public void lock() 
        while (!tryLock()) 
            waitForLock();
        
    

    private void waitForLock() 
        // 创建countdownLatch协同
        CountDownLatch countDownLatch = new CountDownLatch(1);

        // 注册watcher监听
        IZkDataListener listener = new IZkDataListener() 
            @Override
            public void handleDataChange(String path, Object o) throws Exception 
                //System.out.println("zookeeper data has change!!!");
            

            @Override
            public void handleDataDeleted(String s) throws Exception 
                // System.out.println("zookeeper data has delete!!!");
                // 监听到锁释放了,释放线程
                countDownLatch.countDown();
            
        ;
        zkClient.subscribeDataChanges(localPath , listener);

        // 线程等待
        if (zkClient.exists(localPath)) 
            try 
                countDownLatch.await();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        

        // 取消注册
        zkClient.unsubscribeDataChanges(localPath , listener);

    

    @Override
    public void unlock() 
        zkClient.delete(localPath);
    

    @Override
    public boolean tryLock() 
        try 
            zkClient.createEphemeral(localPath);
         catch (ZkNodeExistsException e) 
            return false;
        
        return true;
    

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return false;
    

    @Override
    public void lockInterruptibly() throws InterruptedException 
    

    @Override
    public Condition newCondition() 
        return null;
    


订单服务api

package com.example.concurrent.zkSample;


public interface OrderService 
    void createOrder();


订单服务实现类,加上zookeeper分布式锁

package com.example.concurrent.zkSample;

import java.util.concurrent.locks.Lock;


public class OrderServiceInvoker implements OrderService


    @Override
    public void createOrder() 
        Lock zkLock = new ZKDistributeLock("/zk-test");
        //Lock zkLock = new ZKDistributeImproveLock("/zk-test");
        String orderCode = null;
        try 
            zkLock.lock();
            orderCode = OrderCodeGenerator.generatorOrderCode();

         finally 
            zkLock.unlock();
        
        System.out.println(String.format("thread name : %s , orderCode : %s" ,
                Thread.currentThread().getName(),
                orderCode));
    



因为搭建分布式环境比较繁琐,所以这里使用juc里的并发协同工具类,CyclicBarrier模拟多线程并发的场景,模拟分布式环境的高并发场景

package com.example.concurrent.zkSample;


import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ConcurrentDistributeTest 

    public static void main(String[] args) 
        // 多线程数
        int threadSize = 30;
        // 创建多线程循环屏障
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadSize , ()->
            System.out.println("准备完成!");
        ) ;

        // 模拟分布式集群的场景
        for (int i = 0 ; i < threadSize ; i ++) 
            new Thread(()->
                OrderService orderService = new OrderServiceInvoker();
                // 所有线程都等待
                try 
                    cyclicBarrier.await();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                 catch (BrokenBarrierException e) 
                    e.printStackTrace();
                
                // 模拟并发请求
                orderService.createOrder();
            ).start();
        
    


跑多几次,没有发现订单号重复的情况,分布式锁还是有点效果的

thread name : Thread-6 , orderCode : 202112100945110
thread name : Thread-1 , orderCode : 202112100945111
thread name : Thread-13 , orderCode : 202112100945112
thread name : Thread-11 , orderCode : 202112100945113
thread name : Thread-14 , orderCode : 202112100945114
thread name : Thread-0 , orderCode : 202112100945115
thread name : Thread-8 , orderCode : 202112100945116
thread name : Thread-17 , orderCode : 202112100945117
thread name : Thread-10 , orderCode : 202112100945118
thread name : Thread-5 , orderCode : 202112100945119
thread name : Thread-2 , orderCode : 2021121009451110
thread name : Thread-16 , orderCode : 2021121009451111
thread name : Thread-19 , orderCode : 2021121009451112
thread name : Thread-4 , orderCode : 2021121009451113
thread name : Thread-18 , orderCode : 2021121009451114
thread name : Thread-3 , orderCode : 2021121009451115
thread name : Thread-9 , orderCode : 2021121009451116
thread name : Thread-12 , orderCode : 2021121009451117
thread name : Thread-15 , orderCode : 2021121009451118
thread name : Thread-7 , orderCode : 2021121009451219

注释加锁的代码,再加大并发数,模拟一下

package com.example.concurrent.zkSample;

import java.util.concurrent.locks.Lock;

public class OrderServiceInvoker implements OrderService


    @Override
    public void createOrder() 
        //Lock zkLock = new ZKDistributeLock("/zk-test");
        //Lock zkLock = new ZKDistributeImproveLock("/zk-test");
        String orderCode = null;
        try 
            //zkLock.lock();
            orderCode = OrderCodeGenerator.generatorOrderCode();

         finally 
            //zkLock.unlock();
        
        System.out.println(String.format("thread name : %s , orderCode : %s" ,
                Thread.currentThread().getName(),
                orderCode));
    



跑多几次,发现出现订单号重复的情况,所以分布式锁是可以保证分布式环境的线程安全的

7、公平式Zookeeper分布式锁

上面例子是一种非公平锁的方式,一旦监听到锁释放了,所有线程都会去抢锁,所以容易出现“惊群效应”

  • 巨大的服务器性能损耗
  • 网络冲击
  • 可能造成宕机

所以,需要改进分布式锁,改成一种公平锁的模式

  • 公平锁:多个线程按照申请锁的顺序去获取锁,线程会在队列里排队,按照顺序去获取锁。只有队列第1个线程才能获取到锁,获取到锁之后,其它线程都会阻塞等待,等到持有锁的线程释放锁,其它线程才会被唤醒。
  • 非公平锁:多个线程都会去竞争获取锁,获取不到就进入队列等待,竞争得到就直接获取锁;然后持有锁的线程释放锁之后,所有等待的线程就都会去竞争锁。

流程图:

代码改进:

package com.example.concurrent.zkSample;

import

以上是关于基于Zookeeper实现分布式锁实践的主要内容,如果未能解决你的问题,请参考以下文章

ZooKeeper分布式锁简单实践

基于zookeeper实现分布式锁

基于zookeeper实现分布式锁

基于redis的分布式锁的分析与实践

基于zookeeper的分布式锁实现

zookeeper学习实践1-实现分布式锁