三天学会ZooKeeper第二天(全网最细)

Posted 活跃的咸鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了三天学会ZooKeeper第二天(全网最细)相关的知识,希望对你有一定的参考价值。

本篇文章参考黑马程序员ZooKeeper教程

三天学会ZooKeeper第一天(全网最细)
三天学会ZooKeeper第三天(全网最细)

Zookeeper session 基本原理

客户端与服务端之间的连接是基于 TCP 长连接,client 端连接 server 端默认的 2181 端口,也就是 session 会话。从第一次连接建立开始,客户端开始会话的生命周期,客户端向服务端的ping包请求,每个会话都可以设置一个超时时间。

Session 的创建
sessionID: 会话ID,用来唯一标识一个会话,每次客户端创建会话的时候,zookeeper 都会为其分配一个全局唯一的 sessionID。zookeeper 创建 sessionID SessionTrackerImpl 中的源码。
在这里插入图片描述
Timeout:会话超时时间。客户端在构造 Zookeeper 实例时候,向服务端发送配置的超时时间,server 端会根据自己的超时时间限制最终确认会话的超时时间。

TickTime:下次会话超时时间点,默认 2000 毫秒。可在 zoo.cfg 配置文件中配置,便于 server 端对 session 会话实行分桶策略管理。

isClosing:该属性标记一个会话是否已经被关闭,当 server 端检测到会话已经超时失效,该会话标记为"已关闭",不再处理该会话的新请求。

Session 的状态

connecting:连接中,session 一旦建立,状态就是 connecting 状态,时间很短。

connected:已连接,连接成功之后的状态。

closed:已关闭,发生在 session 过期,一般由于网络故障客户端重连失败,服务器宕机或者客户端主动断开。

会话超时管理(分桶策略+会话激活)

zookeeper 的 leader 服务器再运行期间定时进行会话超时检查,时间间隔是 ExpirationInterval,单位是毫秒,默认值是 tickTime,每隔 tickTime 进行一次会话超时检查。
在这里插入图片描述
ExpirationTime 的计算方式:

ExpirationTime = CurrentTime + SessionTimeout;
ExpirationTime = (ExpirationTime / ExpirationInterval + 1) * ExpirationInterval;

在 zookeeper 运行过程中,客户端会在会话超时过期范围内向服务器发送请求(包括读和写)或者 ping 请求,俗称心跳检测完成会话激活,从而来保持会话的有效性。

会话激活流程:
在这里插入图片描述
激活后进行迁移会话的过程,然后开始新一轮:
在这里插入图片描述

zookeeper 事件监听机制

watcher概念

zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式。

watcher架构

  1. Zookeeper服务端
  2. Zookeeper客户端
  3. 客户端的ZKWatchManager对象
    在这里插入图片描述
    客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程。

watcher特性
在这里插入图片描述
watcher接口设计

Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。
Watcher内部包含了两个枚举类:KeeperState、EventType
在这里插入图片描述
Watcher通知状态(KeeperState)

KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。路径为
org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性
如下:
在这里插入图片描述
Watcher事件类型(EventType)

EventType是数据节点(znode)发生变化时对应的通知类型。EventType变化时
KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时,EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType,是一个枚举类,枚举属性如下:
在这里插入图片描述
注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;

捕获相应的事件

下表以node-x节点为例,说明调用的注册方法和可监听事件间的关系:
在这里插入图片描述

客服端与服务器的连接状态

KeeperState 通知状态
SyncConnected:客户端与服务器正常连接时
Disconnected:客户端与服务器断开连接时
Expired:会话session失效时
AuthFailed:身份认证失败时
事件类型为:None

public class ZKConnectionWatcher implements Watcher {
    private static String connectString = "192.168.226.144:2181";
    // 计数器对象
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    // 连接对象
    static ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent event) {
        try {
            // 事件类型
            if (event.getType() == Event.EventType.None) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                } else if (event.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("断开连接!");
                } else if (event.getState() == Event.KeeperState.Expired) {
                    System.out.println("会话超时!");
                    zooKeeper = new ZooKeeper(connectString, 5000, new ZKConnectionWatcher());
                } else if (event.getState() == Event.KeeperState.AuthFailed) {
                    System.out.println("认证失败!");
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }


    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper(connectString, 5000, new ZKConnectionWatcher());
            // 阻塞线程等待连接的创建
            countDownLatch.await();
            // 会话id
            System.out.println(zooKeeper.getSessionId());
            // 添加授权用户
            zooKeeper.addAuthInfo("digest", "zzn:1234561".getBytes());
            byte[] bs = zooKeeper.getData("/node1", false, null);
            System.out.println(new String(bs));
            Thread.sleep(50000);
            zooKeeper.close();
            System.out.println("结束");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

检查节点是否存在

// 使用连接对象的监视器
exists(String path, boolean b)
// 自定义监视器
exists(String path, Watcher w)
// NodeCreated:节点创建
// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化

path- znode路径。
b- 是否使用连接对象中注册的监视器。
w-监视器对象。

public class ZKWatcherExists {
    public static String IP = "192.168.226.144:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接zookeeper客户端
        zooKeeper = new ZooKeeper(IP, 6000, event -> {
            System.out.println("连接对象的参数!");
            // 连接成功
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                countDownLatch.countDown();
            }
            System.out.println("path=" + event.getPath());
            System.out.println("eventType=" + event.getType());
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void watcherExists1() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:使用连接对象中的watcher
        Stat exists = zooKeeper.exists("/watcher/node1", true);
        System.out.println(exists == null ? "节点不存在" : "节点存在");
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherExists2() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:自定义watcher对象
        zooKeeper.exists("/watcher/node1", event -> {
            System.out.println("自定义watcher");
            System.out.println("path=" + event.getPath());
            System.out.println("eventType=" + event.getType());
        });
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherExists3() throws KeeperException, InterruptedException {
        // watcher一次性
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    zooKeeper.exists("/watcher/node1", this);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        zooKeeper.exists("/watcher/node1", watcher);
        Thread.sleep(80000);
        System.out.println("结束");
    }

    @Test
    public void watcherExists4() throws KeeperException, InterruptedException {
        // 注册多个监听器对象
        zooKeeper.exists("/watcher/node1", event -> {
            System.out.println("1");
            System.out.println("path=" + event.getPath());
            System.out.println("eventType=" + event.getType());
        });
        zooKeeper.exists("/watcher/node1", event -> {
            System.out.println("2");
            System.out.println("path=" + event.getPath());
            System.out.println("eventType=" + event.getType());
        });
        Thread.sleep(80000);
        System.out.println("结束");
    }
}

查看节点

// 使用连接对象的监视器
getData(String path, boolean b, Stat stat)
// 自定义监视器
getData(String path, Watcher w, Stat stat)
// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化

path- znode路径。
b- 是否使用连接对象中注册的监视器。
w-监视器对象。
stat- 返回znode的元数据。

/**
 * getData 监控 Change ,Deleted
 */
public class ZKWatcherGetData {
    public static String IP = "192.168.226.144:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接zookeeper客户端
        zooKeeper = new ZooKeeper(IP, 6000, event -> {
            System.out.println("连接对象的参数!");
            // 连接成功
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                countDownLatch.countDown();
            }
            System.out.println("path=" + event.getPath());
            System.out.println("eventType=" + event.getType());
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void watcherGetData1() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:使用连接对象中的watcher
        zooKeeper.getData("/watcher/node2", true, null);
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetData2() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:自定义watcher对象
        zooKeeper.getData("/watcher/node2", event -> {
            System.out.println("自定义watcher");
            System.out.println("path=" + event.getPath());
            System.out.println("eventType=" + event.getType());
        }, null);
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetData3() throws KeeperException, InterruptedException {
        // 一次性
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    if (event.getType() == Event.EventType.NodeDataChanged) {
                        zooKeeper.getData("/watcher/node2", this, null);
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        byte[] data = zooKeeper.getData("/watcher/node2", watcher, null);
        System.out.println("data = " + new String(data));
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherGetData4() throws KeeperException, InterruptedException {
        // 注册多个监听器对象
        zooKeeper.getData("/watcher/node2", new Watcher() {
            @Override
            public void process(WatchedEvent event) 三天学会ZooKeeper第一天(全网最细)

ZooKeeper面试题总结

2021年 全网最细大数据学习笔记:Zookeeper 集群

5天学会jaxws-webservice编程第三天

10天学会phpWeChat——第三天:从数据库读取数据到视图

实训第三天