ZooKeeper ---- Zookeeper基本入门

Posted whc__

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper ---- Zookeeper基本入门相关的知识,希望对你有一定的参考价值。

一、ZK简介

  • dubbo框架、spring cloud框架、zk注册中心
  • zk实现分布式锁

ZooKeeper,简称ZK,一个分布式的,开放源码的分布式应用程序协调服务,使用Java编写,支持Java和C两种编程语言。

二、ZK内存数据类型

1. 模型结构

image-20210605235623476

2. 模型的特点

  • 每个子目录如/node1都被称作一个znode(节点)。这个znode是被它所在的路径唯一标识
  • znode可以有子节点目录,并且每个znode可以存储数据
  • znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  • znode可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端

三、节点的分类

  • 持久节点(PERSISTENT)

    是指在节点创建后,就一直存在,直到有删除操作来主动删除这个节点–不会因为创建该节点的客户端会话失效而消失

  • 持久顺序节点(PERSISTENT_SEQUENTIAL)

    这类节点的基本特性和上面的节点类型是一致的。额外的特性是:在ZK中,每个父节点会为它的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值

  • 临时节点(EPHEMERAL)

    和持久节点不同的是,如果客户端会话失效,那么这个节点就会自动被清除掉。注意是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点

  • 临时顺序节点(EPHEMERAL_SEQUENTIAL)

    具有临时节点特点,额外的特性是每个父节点会为它的第一级子节点维护一份时序。这点和刚才提到的持久顺序节点类似

四、安装

Docker安装

  1. docker pull zookeeper:3.4.14
  2. docker run --name zk -p 2181:2181 -d zookeeper:3.4.14

进入内部启动

  1. docker exec -it zk bash
  2. cd ./bin
  3. zkCli.sh

以上的安装,只是简单的安装,没有挂载目录文件和配置目录文件之类的,而且进入到zk内部发现没有zoo.cfg文件,所以如果要实现的话,需按照另外一个教程,链接如下:

docker安装zookeeper

五、客户端基本指令

  1. ls path 查看特定节点下面的子节点

  2. create path data 创建一个节点,并给节点绑定数据(默认是持久性节点)

    • create path data 创建持久节点
    • create -s path data 创建持久性顺序节点
    • create -e path data 创建临时节点(注意:临时节点不能含有任何子节点)
    • create -e -s path data 创建临时顺序节点(注意: 临时节点不能含有任何子节点)
  3. stat path 查看节点状态

  4. set path data 修改节点数据

  5. ls2 path 查看节点下孩子和当前节点的状态

  6. history 查看操作历史

  7. get path 获取节点上绑定的数据信息

  8. delete path 删除节点(注意: 删除节点不能含有子节点)

  9. rmr path 递归删除节点

  10. quit 退出当前会话(会话失效)

六、节点监听机制watch

客户端可以监测 znode节点的变化。 Zonode节点的变化触发相应的事件,然后清除对该节点的监测。当监测一个 znode节点时候,
Zookeeper会发送通知给监测节点。一个 Watch事件是一个一次性的触发器,当被设置了 Watch的数据和目录发生了改变的时候,则服务器将这个改变发送给设置了 Watch的客户端以便通知它们。

# 1. ls /path true 监听节点目录的变化
# 2. get /path true 监听节点数据的变化

image-20210607003826701

七、ZooKeeper JavaAPI操作

1、Curator介绍

  • Curator是Apache ZooKeeper的Java客户端库
  • 常见的ZooKeeper Java API:
    • 原生Java API
    • ZkClient
    • Curator
  • Curator项目的目标是简化ZooKeeper客户端的使用

2、Curator API常用操作

image-20210611002243890

2.1 建立连接

/**
*
* @param connectString       连接字符串,zk server地址和端口 "112.74.188.132:2181"
* @param sessionTimeoutMs    会话超时时间,单位ms
* @param connectionTimeoutMs 连接超时时间,单位ms
* @param retryPolicy         重试策略
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
      .connectString("112.74.188.132:2181")
      .sessionTimeoutMs(60 * 1000)
      .connectionTimeoutMs(15 * 1000)
      .retryPolicy(retryPolicy)
      .namespace("whc")
      .build();

2.2 添加节点

/**
 * 创建节点: create持久、临时、顺序、
 * 1. 基本创建: create().forPath("")
 * 2. 创建节点 带有数据: create().forPath("",data)
 * 3. 设置节点的类型: create().withMode().forPath("",data)
 * 4. 创建多级节点 /app1/p1 : create().creatingParentsIfNeeded().forPath("",data);
 */
@Test
public void testCreate1() throws Exception {
   // 1. 基本创建
   // 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
   String s = client.create().forPath("/app1");
   System.out.println(s);
}

@Test
public void testCreate2() throws Exception {
   // 2. 创建节点,带有数据存储
   String s = client.create().forPath("/app2", "hehe".getBytes());
   System.out.println(s);
}

@Test
public void testCreate3() throws Exception {
   // 3. 设置节点的类型
   // 默认类型是持久节点
   String s = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
   System.out.println(s);
}

@Test
public void testCreate4() throws Exception {
   // 3. 创建多级节点 /app4/p1
   // creatingParentsIfNeeded如果父节点不存在,则创建父节点
   String s = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
   System.out.println(s);
}

2.3 删除节点

/**
 * 删除节点: delete deleteall
 * 1. 删除单个节点: delete().forPath("/app1");
 * 2. 删除带有子节点的节点: client.delete().deletingChildrenIfNeeded().forPath("/app4");
 * 3. 必须成功的删除:为了防止网络抖动,本质就是重试,client.delete().guaranteed().forPath("/app2");
 * 4. 回调: inBackground
 * @throws Exception
 */
@Test
public void testDelete() throws Exception {
   // 1. 删除单个节点
   client.delete().forPath("/app1");
}

@Test
public void testDelete2() throws Exception {
   // 2. 删除带有子节点的节点
   client.delete().deletingChildrenIfNeeded().forPath("/app4");
}

@Test
public void testDelete3() throws Exception {
   // 3. 必须成功的删除
   client.delete().guaranteed().forPath("/app2");
}

@Test
public void testDelete4() throws Exception {
   // 4. 回调
   client.delete().guaranteed().inBackground(new BackgroundCallback() {
      @Override
      public void processResult(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
         System.out.println("我被删除了");
         System.out.println(curatorEvent);
      }
   }).forPath("/app1");
}

2.4 修改节点

/**
 * 修改数据
 * 1. 修改数据: setData().forPath()
 * 2. 根据版本修改: setData().withVersion().forPath()
 *        version是通过查询出来的,目的就是为了让其它客户端或者线程不干扰我 
 * @throws Exception
 */
@Test
public void testSet() throws Exception {
   client.setData().forPath("/app1", "whc".getBytes());
}

@Test
public void testSetForVersion() throws Exception {
   // 3. 查询节点状态信息: ls -s
   Stat stat = new Stat();
   System.out.println(stat);
   client.getData().storingStatIn(stat).forPath("/app1");

   int version = stat.getVersion(); // 查询出来的
   System.out.println(version);
   client.setData().withVersion(version).forPath("/app1", "haha".getBytes());
}

2.5 查询节点

/**
 * 查询节点:
 * 1. 查询数据: get: getData().forPath()
 * 2. 查询子节点: ls: getChildren().forPath()
 * 3. 查询节点状态信息: ls -s: getData().storingStatIn(状态对象).forPath()
 */

@Test
public void testGet1() throws Exception {
   // 1. 查询数据: get
   byte[] data = client.getData().forPath("/app1");
   System.out.println(new String(data));
}

@Test
public void testGet2() throws Exception {
   // 2. 查询子节点: ls
   List<String> path = client.getChildren().forPath("/");
   System.out.println(path);
}

@Test
public void testGet3() throws Exception {
   // 3. 查询节点状态信息: ls -s
   Stat stat = new Stat();
   System.out.println(stat);
   client.getData().storingStatIn(stat).forPath("/app1");
}

2.6 Watch事件监听

  • ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性
  • ZooKeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者
  • ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐
  • Curator引入了Cache来实现对ZooKeeper服务端事件的监听
  • ZooKeeper提供了三种Watcher:
    • NodeCache: 只是监听某一个特定的节点
    • PathChildrenCache: 监听一个ZNode的子节点
    • TreeCache: 可以监听整个树上的所有结点,类似PathChildrenCache和NodeCache的组合
  1. 给指定一个节点注册监听器

    /**
     * 演示 NodeCache: 给指定一个节点注册监听器
     */
    @Test
    public void testNodeCache() throws Exception {
       // 1. 创建NodeCache对象
       final NodeCache nodeCache = new NodeCache(client, "/app1");
       // 2. 注册监听
       nodeCache.getListenable().addListener(new NodeCacheListener() {
          @Override
          public void nodeChanged() throws Exception {
             System.out.println("节点变化了");
    
             // 获取修改节点后的数据
             byte[] data = nodeCache.getCurrentData().getData();
             System.out.println(new String(data));
          }
       });
       // 3. 开启监听,如果设置为true,则开启监听,加载缓存数据
       nodeCache.start(true);
    
       while(true) {
    
       }
    }
    
  2. 监听某个节点的所有子节点们

    /**
     * 演示 PathChildrenCache: 监听某个节点的所有子节点们
     */
    @Test
    public void testPathChildrenCache() throws Exception {
       // 1. 创建监听对象
       PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
    
       // 2. 绑定监听器
       pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
          @Override
          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
             System.out.println("子节点变化了..");
             System.out.println(pathChildrenCacheEvent);
             // 监听子节点的数据变更,并且拿到变更后的数据
             // 1. 获取类型
             PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
             // 2. 判断类型是否是update
             if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                byte[] data = pathChildrenCacheEvent.getData().getData();
                System.out.println(new String(data));
             }
          }
       });
    
       // 3. 开启
       pathChildrenCache.start();
    
       while(true) {
    
       }
    }
    
  3. 监听某个节点自己和所有子节点们

    /**
     * 演示 TreeCache: 监听某个节点自己和所有子节点们
     */
    @Test
    public void testTreeCache() throws Exception {
       // 1. 创建监听器
       TreeCache treeCache = new TreeCache(client, "/app2");
    
       // 2. 注册监听
       treeCache.getListenable().addListener(new TreeCacheListener() {
          @Override
          public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
             System.out.println("节点变化了");
             System.out.println(treeCacheEvent);
          }
       });
    
       // 3. 开启
       treeCache.start();
    
       while (true){
    
       }
    }
    

八、集群架构

集群(cluster)

# 集群(cluster)
- 集合同一种软件服务的多个节点同时提供服务

# 集群解决问题
- 单节点的并发访问的压力问题
- 单节点故障问题(如硬件老化,自然灾害等)

1、集群介绍

集群架构

image-20210611001226571

Leader选举:

  • Serverid:服务器ID

    比如有三台服务器,编号分别是1,2,3。编号越大在选举算法中的权重越大。

  • Zxid:数据ID

    服务器中存放的最多数据ID,值越大说明数据越新,在选举算法中数据越新权重越大

  • 在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了

九、分布式锁

1、概念

  • 在我们进行单机应用开发,涉及并发同步的时候,我们往往采用 synchronized或者lock的方式来解决多线程间的代码同步问题
    这时多线程的运行都是在同一个JVM之下,没有任何问题。

    image-20210610231008903

  • 但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。所以得靠分布式锁组件解决问题,由它来提供一个唯一的锁。

    image-20210610231442043

分布式锁:处理跨机器的进程之间的数据同步问题

2、分类

image-20210610231540265
  • 数据库层面实现的分布式锁:悲观锁、乐观锁

    性能低

  • 基于缓存实现的分布式锁:Redis、Memcache

    性能好

    弊端:Redis做集群后,如果Master节点在没有被其它节点同步的时候,突然宕机,那就可能存在多个人同时获取到锁的情况

  • 基于ZooKeeper实现的分布式锁

    最为可靠

3、ZooKeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点

image-20210610232844351
  1. 客户端获取锁时,在lock节点下创建临时顺序节点

    - 为什么是临时而不是永久节点?
    假如Client1创建永久节点/lock/1,在处理数据的过程中,假如Client1机器宕机,而Client1此时并没有释放锁,那么剩下的Client2、Client3都无法获取到锁。
    如果创建的是临时节点/lock/1,即使Client1宕机了,意味着连接断开,即会话结束,临时节点会在会话结束之后自动的删除,为了防止锁不被释放。
    
    - 为什么是顺序节点?
    
    
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。

  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。

  4. 如果发现比自己小的那个节点被删除,则客户端的
    Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,
    如果不是则重复以上步骤继续获取到比自己小的一个节点
    并注册监听。

    这就是第一步中为什么创建顺序节点的原因。因为client2创建了子节点/lock/2,它监听者client1的删除操作事件,一旦client1删除了/lock/1/节点,此时client2的watcher收到了相应通知,/lock/2是此时最小的,所以client2能获取到锁。
    

4、ZK JavaAPI操作

Curator实现分布式锁API

在Curator中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排他锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

分布式案例-模拟12306售票

image-20210610235116475

加锁实现逻辑是在12306实现的,因为共享资源是存在12306上的,让12306充当ZooKeeper的客户端。

4.1 代码实现

模拟携程、飞猪抢夺票资源

public class LockTest {

	public static void main(String[] args) {
		Ticket12306 ticket12306 = new Ticket12306();

		// 创建客户端
		Thread t1 = new Thread(ticket12306, "携程");
		Thread t2 = new Thread(ticket12306, "飞猪");

		t1.start();
		t2.start();
	}
}

模拟12306提供卖票功能,同时12306充当ZK客户端,最后实现分布式锁

public class Ticket12306 implements Runnable {

   private int tickets = 10; // 数据库的票数

   private InterProcessMutex lock;

   // 初始化lock
   public Ticket12306() {
      // 重试策略
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
      CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("112.74.188.132:2181")
            .sessionTimeoutMs(60 * 1000)
            .connectionTimeoutMs(15 * 1000)
            .retryPolicy(retryPolicy)
            .build();

      // 开启连接
      client.start();

      lock = new InterProcessMutex(client, "/lock");
   }

   @Override
   public void run() {
      while(true) {

         // 获取锁
         try {
            lock.acquire(3, TimeUnit.SECONDS);
            if(tickets > 0) {
               System.out.println(Thread.currentThread() + ":" + tickets);
               tickets--;
            }
         } catch (Exception e) {
            e.printStackTrace();
         } finally {
            // 释放锁
            try {
               lock.release();
            } catch (Exception e) {
               e.printStackTrace();
            }
         }
      }
   }
}
verride
   public void run() {
      while(true) {

         // 获取锁
         try {
            lock.acquire(3, TimeUnit.SECONDS);
            if(tickets > 0如果有人问你ZooKeeper是什么,就把这篇文章发给他。

zookeeper收尾+dubbo前瞻

[云原生专题-54]:Kubesphere云治理-操作-通过K8S的应用仓库一键部署微服务应用- 分布式协调服务中间件zookeeper的安装与部署

Zookeeper集群

Zookeeper -- 初识ZookeeperZookeeper的安装和配置Zookeeper命令操作(Zookeeper数据模型 Zookeeper服务端 / 客户端常用命令)

ZooKeeper集群