ZooKeeper Distributed lock
Posted byxxw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper Distributed lock相关的知识,希望对你有一定的参考价值。
- https://segmentfault.com/a/1190000016351095
- http://www.dengshenyu.com/java/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/10/23/zookeeper-distributed-lock.html
Test
Enumerable.Range(1, 5).ToList().ForEach(i =>
Task.Run(() =>
var lockHelper = new ZooKeeperLockHelper("localhost:5181");
lockHelper.OnAcquireLock += (id) =>
var random = new Random().Next(10);
Log.Debug("NodeId @id executing.....Sleep @ms ms", id, random * 1000);
Thread.Sleep(random * 1000);
Log.Debug("NodeId @id executing success", id);
return Task.CompletedTask;
;
lockHelper.AcquireLock();
);
);
using org.apache.zookeeper;
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
namespace RedisDemo
public class ZooKeeperLockHelper : Watcher, IDisposable
#region event
public event Func<long, Task> OnAcquireLock;
#endregion
private bool _disposed;
private ZooKeeper _zooKeeper;
private Event.KeeperState _currentState;
private AutoResetEvent _notifyEvent = new AutoResetEvent(false);
private string _connectionString;
private bool _hasAcquireLock;
private string _lockPath;
private long _currentNodeId;
private static readonly string DEFAULT_PATH = "/zk";
private static readonly string NODE_NAME = "node-";
public ZooKeeperLockHelper(string connectionString)
_connectionString = connectionString;
this.Initialize(_connectionString, TimeSpan.FromSeconds(60));
public void AcquireLock(string path = "")
if (this._hasAcquireLock)
FireAcquireLock(this._currentNodeId).Wait();
return;
if (!WaitConnected(TimeSpan.FromSeconds(10)))
throw new Exception($"_connectionString Cannot Connect ZooKeeper");
_lockPath = path;
if (string.IsNullOrEmpty(_lockPath))
_lockPath = DEFAULT_PATH;
var nodePath = _lockPath + "/" + NODE_NAME;
var spath = this._zooKeeper.createAsync(
nodePath, Encoding.UTF8.GetBytes("data"),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL).Result;
this._currentNodeId = ParseNodeId(spath);
var reuslt = this._zooKeeper.getChildrenAsync(_lockPath, true).GetAwaiter().GetResult();
Log.Debug("#-> Begin Acquire Lock CurrentId @id", _currentNodeId);
if (this.IsMinNodeId(reuslt, this._currentNodeId))
lock (this)
if (!this._hasAcquireLock)
Log.Debug("NodeId @id Direct Acquire Lock", _currentNodeId);
this._hasAcquireLock = true;
this.FireAcquireLock(this._currentNodeId).Wait();
protected bool IsMinNodeId(ChildrenResult childrenResult, long nodeId)
if (nodeId == 0 || childrenResult == null || childrenResult.Children.Count == 0)
return false;
var nodeIds = new List<long>();
foreach (var item in childrenResult.Children)
nodeIds.Add(ParseNodeId(item));
if (nodeIds.Count > 0 && nodeIds.Min() == nodeId)
return true;
return false;
protected long ParseNodeId(string path)
var m = Regex.Match(path, "(\\d+)");
if (m.Success)
return long.Parse(m.Groups[0].Value);
return 0L;
protected void Initialize(String connectionString, TimeSpan sessionTimeout)
this._zooKeeper = new ZooKeeper(connectionString, (int)sessionTimeout.TotalMilliseconds, this);
public Task FireAcquireLock(long id)
this.OnAcquireLock(id).Wait();
this.CloseConnection();
Log.Debug("NodeId @id Close ZooKeeper Success", id);
return Task.CompletedTask;
public bool WaitConnected(TimeSpan timeout)
var continueWait = false;
while (this._currentState != Event.KeeperState.SyncConnected)
continueWait = _notifyEvent.WaitOne(timeout);
if (!continueWait)
return false;
return true;
protected void CloseConnection()
if (_disposed)
return;
_disposed = true;
if (_zooKeeper != null)
try
this._zooKeeper.closeAsync().ConfigureAwait(false).GetAwaiter().GetResult();
catch
#region Watcher Impl
public override Task process(WatchedEvent @event)
if (@event.getState() == Event.KeeperState.SyncConnected)
if (String.IsNullOrEmpty(@event.getPath()))
this._currentState = @event.getState();
this._notifyEvent.Set();
var path = @event.getPath();
if (!string.IsNullOrEmpty(path))
if (path.Equals(this._lockPath))
Log.Debug("NodeId @id Start Watcher Callback", this._currentNodeId);
if (this._hasAcquireLock)
Log.Debug("NodeId @id Has Acquire Lock return", this._currentNodeId);
return Task.CompletedTask;
Task.Run(() =>
var childrenResult = _zooKeeper.getChildrenAsync(this._lockPath, this).Result;
if (IsMinNodeId(childrenResult, this._currentNodeId))
lock (this)
if (!this._hasAcquireLock)
Log.Debug("NodeId @id Acquire Lock", this._currentNodeId);
this._hasAcquireLock = true;
this.FireAcquireLock(this._currentNodeId).Wait();
);
//_zooKeeper.getChildrenAsync(_lockPath, this);
return Task.CompletedTask;
public void Dispose()
this.CloseConnection();
#endregion
以上是关于ZooKeeper Distributed lock的主要内容,如果未能解决你的问题,请参考以下文章
Mycat(18):mycat全局主键的生成方式之zookeeper方式