ZooKeeper Distributed lock

Posted byxxw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper Distributed lock相关的知识,希望对你有一定的参考价值。

  • https://segmentfault.com/a/1190000016351095
  • http://www.dengshenyu.com/java/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/10/23/zookeeper-distributed-lock.html

Test

Enumerable.Range(1, 5).ToList().ForEach(i =>
             
                 Task.Run(() =>
                 

                     var lockHelper = new ZooKeeperLockHelper("localhost:5181");
                     lockHelper.OnAcquireLock += (id) =>
                     
                         var random = new Random().Next(10);
                         Log.Debug("NodeId @id executing.....Sleep @ms ms", id, random * 1000);
                         
                         Thread.Sleep(random * 1000);
                         Log.Debug("NodeId @id executing success", id);

                         return Task.CompletedTask;
                     ;

                     lockHelper.AcquireLock();
                    
                );

             );
using org.apache.zookeeper;
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

namespace RedisDemo

    public class ZooKeeperLockHelper : Watcher, IDisposable
    

        #region event

        public event Func<long, Task> OnAcquireLock;
        #endregion

        private bool _disposed;

        private ZooKeeper _zooKeeper;
        private Event.KeeperState _currentState;
        private AutoResetEvent _notifyEvent = new AutoResetEvent(false);


        private string _connectionString;

        private bool _hasAcquireLock;
        private string _lockPath;
        private long _currentNodeId;

        private static readonly string DEFAULT_PATH = "/zk";
        private static readonly string NODE_NAME = "node-";

        public ZooKeeperLockHelper(string connectionString)
        
            _connectionString = connectionString;
            this.Initialize(_connectionString, TimeSpan.FromSeconds(60));
        

        public void AcquireLock(string path = "")
        
            if (this._hasAcquireLock)
            
                FireAcquireLock(this._currentNodeId).Wait();
                return;
            

            if (!WaitConnected(TimeSpan.FromSeconds(10)))
            
                throw new Exception($"_connectionString Cannot Connect ZooKeeper");
            

            _lockPath = path;
            if (string.IsNullOrEmpty(_lockPath))
            
                _lockPath = DEFAULT_PATH;
            

            var nodePath = _lockPath + "/" + NODE_NAME;

            var spath = this._zooKeeper.createAsync(
                nodePath, Encoding.UTF8.GetBytes("data"),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL).Result;
            this._currentNodeId = ParseNodeId(spath);

            var reuslt = this._zooKeeper.getChildrenAsync(_lockPath, true).GetAwaiter().GetResult();
            Log.Debug("#-> Begin Acquire Lock CurrentId @id", _currentNodeId);

            if (this.IsMinNodeId(reuslt, this._currentNodeId))
            
                lock (this)
                
                    if (!this._hasAcquireLock)
                    
                        Log.Debug("NodeId @id Direct Acquire Lock", _currentNodeId);
                        this._hasAcquireLock = true;
                        this.FireAcquireLock(this._currentNodeId).Wait();
                    

                
            

        

        protected bool IsMinNodeId(ChildrenResult childrenResult, long nodeId)
        
            if (nodeId == 0 || childrenResult == null || childrenResult.Children.Count == 0)
                return false;

            var nodeIds = new List<long>();

            foreach (var item in childrenResult.Children)
            
                nodeIds.Add(ParseNodeId(item));
            

            if (nodeIds.Count > 0 && nodeIds.Min() == nodeId)
            
                return true;

            
            return false;
        

        protected long ParseNodeId(string path)
        
            var m = Regex.Match(path, "(\\d+)");
            if (m.Success)
            
                return long.Parse(m.Groups[0].Value);
            
            return 0L;
        

        protected void Initialize(String connectionString, TimeSpan sessionTimeout)
        
            this._zooKeeper = new ZooKeeper(connectionString, (int)sessionTimeout.TotalMilliseconds, this);
        

        public Task FireAcquireLock(long id)
        
            this.OnAcquireLock(id).Wait();
            this.CloseConnection();
            Log.Debug("NodeId @id Close ZooKeeper Success", id);
            return Task.CompletedTask;
        

        public bool WaitConnected(TimeSpan timeout)
        
            var continueWait = false;
            while (this._currentState != Event.KeeperState.SyncConnected)
            
                continueWait = _notifyEvent.WaitOne(timeout);
                if (!continueWait)
                
                    return false;
                
            
            return true;
        

        protected void CloseConnection()
        
            if (_disposed)
            
                return;
            
            _disposed = true;

            if (_zooKeeper != null)
            
                try
                
                    this._zooKeeper.closeAsync().ConfigureAwait(false).GetAwaiter().GetResult();
                
                catch  

            
        

        #region Watcher Impl

        public override Task process(WatchedEvent @event)
        
            if (@event.getState() == Event.KeeperState.SyncConnected)
            
                if (String.IsNullOrEmpty(@event.getPath()))
                
                    this._currentState = @event.getState();
                    this._notifyEvent.Set();
                

                var path = @event.getPath();
                if (!string.IsNullOrEmpty(path))
                
                    if (path.Equals(this._lockPath))
                    
                        Log.Debug("NodeId @id Start Watcher Callback", this._currentNodeId);

                        if (this._hasAcquireLock)
                        
                            Log.Debug("NodeId @id Has Acquire Lock return", this._currentNodeId);
                            return Task.CompletedTask;
                        

                        Task.Run(() =>
                        
                            var childrenResult = _zooKeeper.getChildrenAsync(this._lockPath, this).Result;

                            if (IsMinNodeId(childrenResult, this._currentNodeId))
                            
                                lock (this)
                                
                                    if (!this._hasAcquireLock)
                                    
                                        Log.Debug("NodeId @id Acquire Lock", this._currentNodeId);
                                        this._hasAcquireLock = true;
                                        this.FireAcquireLock(this._currentNodeId).Wait();
                                    
                                
                            
                        );

                        //_zooKeeper.getChildrenAsync(_lockPath, this);

                    
                
            

            return Task.CompletedTask;
        



        public void Dispose()
        
            this.CloseConnection();
        

        #endregion
    

以上是关于ZooKeeper Distributed lock的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper的介绍与基本部署

Linux安装HBase使用Zookeeper设置

Mycat(18):mycat全局主键的生成方式之zookeeper方式

ZooKeeper : Curator框架之共享计数器DistributedAtomicLong

storm.yaml 配置项

二:Storm的配置项说明