zookeeper 分布式锁

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper 分布式锁相关的知识,希望对你有一定的参考价值。


分布式锁有很多,redis也可以实现分布式锁,

http://shangdc.blog.51cto.com/10093778/1914852(查看redis的分布式锁)


zookeeper分布式锁步骤:

1、zookeeper是一个带有节点的,类似于文件目录,所以我们把锁抽象成目录,zookeeper有一个EPHEMERAL_SEQUENTIAL类型的节点, 多个线程再zookeeper创建的节点的时候,它会帮我们安排好顺序进行创建,所以这个节点下的目录都是顺序的。

2、获取当前目录的最小的节点,判断最小节点是不是当前的自己的节点,如果是说明获取锁成功了,如果不是获取锁失败了。

3、当获取锁的时候失败了,为了避免惊群效应,你要做的就是获取当前自己的节点的上一个节点,然后对该节点进行监听,当上一个节点删除的时候,会触发这个监听,通知该节点。

4、这么做,释放锁的时候,也会通知下一个节点。


什么是惊群效应:理解为肉少狼多,当一个节点删除的时候,凡是订阅了此节点的watcha的监听都会重新获取锁,都要去争夺,如果数量少还好,当数量很大的时候这种设计就是不合理也是浪费资源。


zookeeper的状态和事件类型,提前了解一下。

状态 KeeperState.Disconnected (0)  断开
 * KeeperState.SyncConnected (3)  同步连接状态
 * KeeperState.AuthFailed (4) 认证失败状态
 * KeeperState.ConnectedReadOnly (5)  只读连接状态
 * KeeperState.SaslAuthenticated (6) SASL认证通过状态
 * KeeperState.Expired (-112)  过期状态
 * 
 * // EventType 是事件类型 主要关注 Create Delete DataChanged ChildrenChanged
 * EventType.None (-1), 无
 * EventType.NodeCreated (1),
 * EventType.NodeDeleted (2),
 * EventType.NodeDataChanged (3),  结点数据变化
 * EventType.NodeChildrenChanged (4); 结点子节点变化


下面是代码,自己敲下,理解一下。

package com.lhcis.spider.system.annotation;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author sdc
 *
 */
public class ZooDistributeLock implements Watcher {

	private static final Logger LOG = LoggerFactory.getLogger(ZooDistributeLock.class);

	private static final String LOCK_PATH = "/zkLock";

	// 模拟开启的线程数
	private static final int THREAD_NUM = 5;

	// 用于等待所有线程都连接成功后再执行任务
	private static CountDownLatch startFlag = new CountDownLatch(1);

	// 用于确保所有线程执行完毕
	private static CountDownLatch threadFlag = new CountDownLatch(THREAD_NUM);

	private ZooKeeper zk = null;

	private String currentPath;

	private String lockPath;

	public static void main(String[] args) {
		for (int i = 0; i < THREAD_NUM; i++) {
			final int j = i;
			new Thread() {
				@Override
				public void run() {
					ZooDistributeLock zooDistributeLock = new ZooDistributeLock();
					try {
						zooDistributeLock.connection();
						System.out.println("连接" + j);
						zooDistributeLock.createNode();
						System.out.println("创建" + j);
						zooDistributeLock.getLock();
						System.out.println("获取锁" + j);
					} catch (IOException | InterruptedException | KeeperException e) {
						e.printStackTrace();
					}
				}
			}.start();
		}
		try {
			threadFlag.await();
			LOG.info("所有线程执行完毕...");
		} catch (InterruptedException e) {
			LOG.error(e.getMessage(), e);
		}
	}

	/**
	 * Disconnected为网络闪断时触发的事件,当然其他的拔掉网线、kill zookeeper server ,kill zk
	 * connection也会触发该事件。 SyncConnected为client端重新选择下一个zk
	 * server连接触发的事件,此时watcher有效,也就是能正常感知
	 * Expired为客户端重新连server时,服务端发现该session超过了设定的时长,返回给client
	 * Expired,此时watcher失效,也就是不能正常感知
	 */
	@Override
	public void process(WatchedEvent event) {

		Event.KeeperState state = event.getState();
		Event.EventType type = event.getType();

		if (Event.KeeperState.SyncConnected == state) {
			if (Event.EventType.None == type) {
				// 标识连接成功
				LOG.info("成功连接上ZK服务器");
				startFlag.countDown();
			}

			if (Event.EventType.NodeDeleted == type && event.getPath().equals(this.lockPath)) {
				LOG.info("node:" + this.lockPath + "的锁已经被释放");
				try {
					// 上一个节点释放了,当前节点去获取锁
					getLock();
				} catch (KeeperException | InterruptedException e) {
					LOG.error(e.getMessage(), e);
				}
			}
		}

	}

	/**
	 * 连接到 ZK
	 *
	 * @throws IOException
	 */
	private void connection() throws IOException, InterruptedException {

		zk = new ZooKeeper("127.0.0.1:2181", 5000, this);

		// 等待连接成功后再执行下一步操作
		startFlag.await();
	}

	// 创建节点,并初始化当前路径
	private void createNode() throws KeeperException, InterruptedException, UnsupportedEncodingException {
		this.currentPath = this.zk.create(LOCK_PATH, "".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
	}

	private void getLock() throws KeeperException, InterruptedException {
		if (minNode()) {
			doSomething();
			// 释放锁
			releaseLock();
		}
	}

	/**
	 * 当前是否为最小节点
	 *
	 * @return
	 */
	private boolean minNode() {

		// 当前序号
		try {
			initLockPath();
			// 判断前一个节点存在不存在,如果存在,则表示当前节点不是最小节点
			// zk.getData(this.lockPath, this, new Stat());
			zk.getData(this.lockPath, true, new Stat());
			LOG.info(this.currentPath + " 不是最小值,没有获取锁,等待 " + this.lockPath + " 释放锁");
			return false;
		} catch (KeeperException e) {
			LOG.info(this.currentPath + " 是最小值,获得锁");
			return true;
		} catch (InterruptedException e) {
			LOG.error(e.getMessage(), e);
		}
		return true;
	}

	private void doSomething() {
		LOG.info("处理业务逻辑...");
	}

	/**
	 * 释放锁并关闭连接
	 *
	 * @throws KeeperException
	 * @throws InterruptedException
	 */
	private void releaseLock() throws KeeperException, InterruptedException {
		Thread.sleep(2000);
		if (this.zk != null) {
			LOG.info(this.currentPath + " 业务处理完毕,释放锁...");
			zk.delete(this.currentPath, -1);
			this.zk.close();
			LOG.info(Thread.currentThread().getName() + "关闭 zookeeper 连接");
		}
		threadFlag.countDown();
	}

	/**
	 * 初始化 lockpath
	 */
	private void initLockPath() {

		int currentSeq = Integer.parseInt(this.currentPath.substring(LOCK_PATH.length()));

		// 上一个序号
		int preSeq = currentSeq - 1;

		String preSeqStr = String.valueOf(preSeq);
		while (preSeqStr.length() < 10) {
			preSeqStr = "0" + preSeqStr;
		}
		this.lockPath = LOCK_PATH + preSeqStr;
	}

}



参考代码:

https://juejin.im/entry/596438bc6fb9a06bb47495f1


本文出自 “不积跬步无以至千里” 博客,请务必保留此出处http://shangdc.blog.51cto.com/10093778/1958619

以上是关于zookeeper 分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper分布式锁

zooKeeper实现分布式锁

ZooKeeper分布式锁简单实践

基于zookeeper的分布式锁实现

zookeeper怎么实现分布式锁

基于Zookeeper实现的分布式互斥锁 - InterProcessMutex