Zookeeper

Posted java_wxid

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper相关的知识,希望对你有一定的参考价值。

文章目录

1.Zookeeper的特性与节点

在了解Zookeeper之前,需要对分布式相关知识有一定了解,什么是分布式系统呢?通常情况下,单个物理节点很容易达到性能,计算或者容量的瓶颈,所以这个时候就需要多个物理节点来共同完成某项任务,一个分布式系统的本质是分布在不同网络或计算机上的程序组件,彼此通过信息传递来协同工作的系统,而Zookeeper正是一个分布式应用协调框架,在分布式系统架构中有广泛的应用场景。

1. 什么是Zookeeper?

官方文档上这么解释zookeeper,它是一个分布式协调框架,是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

2. Zookeeper 核心概念

上面的解释有点抽象,同学们暂时可以理解为 Zookeeper 是一个用于存储少量数据的基于内存的数据库,主要有如下两个核心的概念:文件系统数据结构+监听通知机制。

2.1、 文件系统数据结构

Zookeeper维护一个类似文件系统的数据结构:


每个子目录项都被称作为 znode(目录节点),和文件系统类似,我们能够自由的增加、删除znode,在一个znode下增加、删除子znode。
有四种类型的znode:

1、PERSISTENT-持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只要不手动删除该节点,他将永远存在

2、 PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号

3、EPHEMERAL-临时目录节点
客户端与zookeeper断开连接后,该节点被删除

4、EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

5、Container 节点(3.5.3 版本新增,如果Container节点下面没有子节点,则Container节点在未来会被Zookeeper自动清除,定时任务默认60s 检查一次)

6、TTL 节点( 默认禁用,只能通过系统配置 zookeeper.extendedTypesEnabled=true 开启,不稳定)

2.2、监听通知机制

客户端注册监听它关心的任意节点,或者目录节点及递归子目录节点

  1. 如果注册的是对某个节点的监听,则当这个节点被删除,或者被修改时,对应的客户端将被通知
  2. 如果注册的是对某个目录的监听,则当这个目录有子节点被创建,或者有子节点被删除,对应的客户端将被通知
  3. 如果注册的是对某个目录的递归子节点进行监听,则当这个目录下面的任意子节点有目录结构的变化(有子节点被创建,或被删除)或者根节点有数据变化时,对应的客户端将被通知。

注意:所有的通知都是一次性的,及无论是对节点还是对目录进行的监听,一旦触发,对应的监听即被移除。递归子节点,监听是对所有子节点的,所以,每个子节点下面的事件同样只会被触发一次。

2.3、Zookeeper 经典的应用场景

  1. 分布式配置中心
  2. 分布式注册中心
  3. 分布式锁
  4. 分布式队列
  5. 集群选举
  6. 分布式屏障
  7. 发布/订阅

3. Zookeeper 实战

3.1. zookeeper安装

Step1:配置JAVA环境,检验环境:

java -version

Step2: 下载解压 zookeeper

wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz

tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz

cd  apache-zookeeper-3.5.8-bin

Step3: 重命名配置文件 zoo_sample.cfg

 cp zoo_sample.cfg  zoo.cfg 

Step4: 启动zookeeper

# 可以通过 bin/zkServer.sh  来查看都支持哪些参数 

bin/zkServer.sh start conf/zoo.cfg

Step5: 检测是否启动成功

echo stat | nc 192.168.109.200 // 前提是配置文件中中讲 stat 四字命令设置了了白名单 

如:

4lw.commands.whitelist=stat

Step6: 连接服务器

bin/zkCli.sh -server ip:port 

3.2. 使用命令行操作zookeeper

输入命令 help 查看zookeeper所支持的所有命令:

[zk: localhost:2181(CONNECTED) 80] help

ZooKeeper -server host:port cmd args

	addauth scheme auth

	close 

	config [-c] [-w] [-s]

	connect host:port

	create [-s] [-e] [-c] [-t ttl] path [data] [acl]

	delete [-v version] path

	deleteall path

	delquota [-n|-b] path

	get [-s] [-w] path

	getAcl [-s] path

	history 

	listquota path

	ls [-s] [-w] [-R] path

	ls2 path [watch]

	printwatches on|off

	quit 

	reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]

	redo cmdno

	removewatches path [-c|-d|-a] [-l]

	rmr path

	set [-s] [-v version] path data

	setAcl [-s] [-v version] [-R] path acl

	setquota -n|-b val path

	stat [-w] path

	sync path
1. 创建zookeeper 节点命令
create [-s] [-e] [-c] [-t ttl] path [data] [acl]

中括号为可选项,没有则默认创建持久化节点
-s: 顺序节点
-e: 临时节点
-c: 容器节点
-t: 可以给节点添加过期时间,默认禁用,需要通过系统参数启用
(-Dzookeeper.extendedTypesEnabled=true, znode.container.checkIntervalMs : (Java system property only) New in 3.5.1: The time interval in milliseconds for each check of candidate container and ttl nodes. Default is “60000”.)

创建节点:

create  /test-node some-data

如上,没有加任何可选参数,创建的就是持久化节点

查看节点:

get  /test-node

修改节点数据:

set /test-node some-data-changed

查看节点状态信息:

stat /test-node 

Stat
cZxid:创建znode的事务ID(Zxid的值)。
mZxid:最后修改znode的事务ID。
pZxid:最后添加或删除子节点的事务ID(子节点列表发生变化才会发生改变)。
ctime:znode创建时间。
mtime:znode最近修改时间。
dataVersion:znode的当前数据版本。
cversion:znode的子节点结果集版本(一个节点的子节点增加、删除都会影响这个版本)。
aclVersion:表示对此znode的acl版本。
ephemeralOwner:znode是临时znode时,表示znode所有者的 session ID。 如果znode不是临时znode,则该字段设置为零。
dataLength:znode数据字段的长度。
numChildren:znode的子znode的数量。

查看节点状态信息同时查看数据

根据状态数据中的版本号有并发修改数据实现乐观锁的功能
比如: 客户端首先获取版本信息, get -s /node-test

/test-node 当前的数据版本是 1 , 这时客户端 用 set 命令修改数据的时候可以把版本号带上

如果在执行上面 set命令前, 有人修改了数据,zookeeper 会递增版本号, 这个时候,如果再用以前的版本号去修改,将会导致修改失败,报如下错误

创建子节点, 这里要注意,zookeeper是以节点组织数据的,没有相对路径这么一说,所以,所有的节点一定是以 / 开头。

create /test-node/test-sub-node

查看子节点信息,比如根节点下面的所有子节点, 加一个大写 R 可以查看递归子节点列表

ls /

查看 /test-node 下面所有的子节点

创建临时节点

create -e /ephemeral data 

create 后跟一个 -e 创建临时节点 , 临时节点不能创建子节点

创建序号节点,加参数 -s

create    /seq-parent  data // 创建父目录,单纯为了分类,非必须

create -s /seq-parent/  data // 创建顺序节点。顺序节点将再seq-parent 目录下面,顺序递增

为了容纳子节点,先创建个父目录 /seq-parent

也可以再序号节点前面带一个前缀

创建临时顺序节点,其它增删查改和其他节点无异,不再贴图

create -s -e  /ephemeral-node/前缀-

创建容器节点

create -c /container

容器节点主要用来容纳字节点,如果没有给其创建子节点,容器节点表现和持久化节点一样,如果给容器节点创建了子节点,后续又把子节点清空,容器节点也会被zookeeper删除。

2. 事件监听机制

针对节点的监听:一定事件触发,对应的注册立刻被移除,所以事件监听是一次性的

get  -w  /path   // 注册监听的同时获取数据

stat -w /path   // 对节点进行监听,且获取元数据信息

针对目录的监听,如下图,目录的变化,会触发事件,且一旦触发,对应的监听也会被移除,后续对节点的创建没有触发监听事件

ls -w /path

针对递归子目录的监听

ls -R -w /path : -R 区分大小写,一定用大写 

如下对/test 节点进行递归监听,但是每个目录下的目录监听也是一次性的,如第一次在/test 目录下创建节点时,触发监听事件,第二次则没有,同样,因为时递归的目录监听,所以在/test/sub0下进行节点创建时,触发事件,但是再次创建/test/sub0/subsub1节点时,没有触发事件。

Zookeeper事件类型:

   None: 连接建立事件
   NodeCreated: 节点创建
   NodeDeleted: 节点删除
   NodeDataChanged:节点数据变化
   NodeChildrenChanged:子节点列表变化
   DataWatchRemoved:节点监听被移除
   ChildWatchRemoved:子节点监听被移除

4. Zookeeper 的 ACL 权限控制( Access Control List )

Zookeeper 的ACL 权限控制,可以控制节点的读写操作,保证数据的安全性,Zookeeper ACL 权限设置分为 3 部分组成,分别是:权限模式(Scheme)、授权对象(ID)、权限信息(Permission)。最终组成一条例如“scheme🆔permission”格式的 ACL 请求信息。下面我们具体看一下这 3 部分代表什么意思:
Scheme(权限模式):用来设置 ZooKeeper 服务器进行权限验证的方式。ZooKeeper 的权限验证方式大体分为两种类型:

一种是范围验证。所谓的范围验证就是说 ZooKeeper 可以针对一个 IP 或者一段 IP 地址授予某种权限。比如我们可以让一个 IP 地址为“ip:192.168.0.110”的机器对服务器上的某个数据节点具有写入的权限。或者也可以通过“ip:192.168.0.1/24”给一段 IP 地址的机器赋权。

另一种权限模式就是口令验证,也可以理解为用户名密码的方式。在 ZooKeeper 中这种验证方式是 Digest 认证,而 Digest 这种认证方式首先在客户端传送“username:password”这种形式的权限表示符后,ZooKeeper 服务端会对密码 部分使用 SHA-1 和 BASE64 算法进行加密,以保证安全性。

还有一种Super权限模式, Super可以认为是一种特殊的 Digest 认证。具有 Super 权限的客户端可以对 ZooKeeper 上的任意数据节点进行任意操作。

授权对象(ID)

授权对象就是说我们要把权限赋予谁,而对应于 4 种不同的权限模式来说,如果我们选择采用 IP 方式,使用的授权对象可以是一个 IP 地址或 IP 地址段;而如果使用 Digest 或 Super 方式,则对应于一个用户名。如果是 World 模式,是授权系统中所有的用户。

权限信息(Permission)

权限就是指我们可以在数据节点上执行的操作种类,如下所示:在 ZooKeeper 中已经定义好的权限有 5 种:

  • 数据节点(c: create)创建权限,授予权限的对象可以在数据节点下创建子节点;
  • 数据节点(w: wirte)更新权限,授予权限的对象可以更新该数据节点;
  • 数据节点(r: read)读取权限,授予权限的对象可以读取该节点的内容以及子节点的列表信息;
  • 数据节点(d: delete)删除权限,授予权限的对象可以删除该数据节点的子节点;
  • 数据节点(a: admin)管理者权限,授予权限的对象可以对该数据节点体进行 ACL 权限设置。

命令:
getAcl:获取某个节点的acl权限信息
setAcl:设置某个节点的acl权限信息
addauth: 输入认证授权信息,相当于注册用户信息,注册时输入明文密码,zk将以密文的形式存储

可以通过系统参数zookeeper.skipACL=yes进行配置,默认是no,可以配置为true, 则配置过的ACL将不再进行权限检测

生成授权ID的两种方式:
a.代码生成ID:

@Test

public void generateSuperDigest() throws NoSuchAlgorithmException 

    String sId = DigestAuthenticationProvider.generateDigest("gj:test");

    System.out.println(sId);//  gj:X/NSthOB0fD/OT6iilJ55WJVado=


b.在xshell 中生成

echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64

设置ACL有两种方式

节点创建的同时设置ACL

create [-s] [-e] [-c]   path [data] [acl]

create /zk-node datatest digest:gj:X/NSthOB0fD/OT6iilJ55WJVado=:cdrwa

或者用setAcl 设置

setAcl /zk-node  digest:gj:X/NSthOB0fD/OT6iilJ55WJVado=:cdrwa

添加授权信息后,不能直接访问,直接访问将报如下异常

get /zk-node

异常信息:

org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-node    

访问前需要添加授权信息

addauth digest gj:test

get /zk-node

datatest

另一种授权模式: auth 明文授权
使用之前需要先
addauth digest username:password 注册用户信息,后续可以直接用明文授权
如:

addauth digest u100:p100

create /node-1 node1data auth:u100:p100:cdwra

这是u100用户授权信息会被zk保存,可以认为当前的授权用户为u100

get /node-1

node1data

IP授权模式:

setAcl /node-ip ip:192.168.109.128:cdwra

create /node-ip  data  ip:192.168.109.128:cdwra

多个指定IP可以通过逗号分隔, 如 setAcl /node-ip ip:IP1:rw,ip:IP2:a

Super 超级管理员模式
这是一种特殊的Digest模式, 在Super模式下超级管理员用户可以对Zookeeper上的节点进行任何的操作。
需要在启动了上通过JVM 系统参数开启:

DigestAuthenticationProvider中定义

-Dzookeeper.DigestAuthenticationProvider.superDigest=super:<base64encoded(SHA1(password))

5. ZooKeeper 内存数据和持久化

Zookeeper数据的组织形式为一个类似文件系统的数据结构,而这些数据都是存储在内存中的,所以我们可以认为,Zookeeper是一个基于内存的小型数据库

内存中的数据:

public class DataTree 

    private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();

DataNode 是Zookeeper存储节点数据的最小单位

public class DataNode implements Record 

    byte data[];

    Long acl;

    public StatPersisted stat;

    private Set<String> children = null;

事务日志

针对每一次客户端的事务操作,Zookeeper都会将他们记录到事务日志中,当然,Zookeeper也会将数据变更应用到内存数据库中。我们可以在zookeeper的主配置文件zoo.cfg 中配置内存中的数据持久化目录,也就是事务日志的存储路径 dataLogDir. 如果没有配置dataLogDir(非必填), 事务日志将存储到dataDir (必填项)目录,zookeeper提供了格式化工具可以进行数据查看事务日志数据
org.apache.zookeeper.server.LogFormatter

java -classpath .:slf4j-api-1.7.25.jar:zookeeper-3.5.8.jar:zookeeper-jute-3.5.8.jar org.apache.zookeeper.server.LogFormatter /usr/local/zookeeper/apache-zookeeper-3.5.8-bin/data/version-2/log.1

如下是我本地的日志文件格式化效果

从左到右分别记录了操作时间,客户端会话ID,CXID,ZXID,操作类型,节点路径,节点数据(用#+ascii 码表示),节点版本。

Zookeeper进行事务日志文件操作的时候会频繁进行磁盘IO操作,事务日志的不断追加写操作会触发底层磁盘IO为文件开辟新的磁盘块,即磁盘Seek。因此,为了提升磁盘IO的效率,Zookeeper在创建事务日志文件的时候就进行文件空间的预分配- 即在创建文件的时候,就向操作系统申请一块大一点的磁盘块。这个预分配的磁盘大小可以通过系统参数 zookeeper.preAllocSize 进行配置。

事务日志文件名为: log.<当时最大事务ID>,应为日志文件时顺序写入的,所以这个最大事务ID也将是整个事务日志文件中,最小的事务ID,日志满了即进行下一次事务日志文件的创建

数据快照

数据快照用于记录Zookeeper服务器上某一时刻的全量数据,并将其写入到指定的磁盘文件中。
可以通过配置snapCount配置每间隔事务请求个数,生成快照,数据存储在dataDir 指定的目录中,
可以通过如下方式进行查看快照数据( 为了避免集群中所有机器在同一时间进行快照,实际的快照生成时机为事务数达到 [snapCount/2 + 随机数(随机数范围为1 ~ snapCount/2 )] 个数时开始快照)

java -classpath .:slf4j-api-1.7.25.jar:zookeeper-3.5.8.jar:zookeeper-jute-3.5.8.jar org.apache.zookeeper.server.SnapshotFormatter /usr/local/zookeeper/apache-zookeeper-3.5.8-bin/data-dir/version-2/snapshot.0

快照事务日志文件名为: snapshot.<当时最大事务ID>,日志满了即进行下一次事务日志文件的创建

有了事务日志,为啥还要快照数据。
快照数据主要时为了快速恢复,事务日志文件是每次事务请求都会进行追加的操作,而快照是达到某种设定条件下的内存全量数据。所以通常快照数据是反应当时内存数据的状态。事务日志是更全面的数据,所以恢复数据的时候,可以先恢复快照数据,再通过增量恢复事务日志中的数据即可。

2.Zookeeper的客户端使用和集群特性

1.Zookeeper Java 客户端

项目构建
zookeeper 官方的客户端没有和服务端代码分离,他们为同一个jar 文件,所以我们直接引入zookeeper的maven即可, 这里版本请保持与服务端版本一致,不然会有很多兼容性的问题

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.8</version>
</dependency>

创建客户端实例:
为了便于测试,直接在初始化方法中创建zookeeper实例

@Slf4j
public class ZookeeperClientTest 

    private static final String ZK_ADDRESS="192.168.109.200:2181";

    private static final int SESSION_TIMEOUT = 5000;

    private static ZooKeeper zooKeeper;

    private static final String ZK_NODE="/zk-node";


    @Before
    public void init() throws IOException, InterruptedException 
        final CountDownLatch countDownLatch=new CountDownLatch(1);
        zooKeeper=new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> 
            if (event.getState()== Watcher.Event.KeeperState.SyncConnected &&
                    event.getType()== Watcher.Event.EventType.None)
                countDownLatch.countDown();
                log.info("连接成功!");
            
        );
        log.info("连接中....");
        countDownLatch.await();
    

创建Zookeeper实例的方法:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider, ZKClientConfig)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[])
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider, ZKClientConfig)
ZooKeeper(String connectString, int  sessionTimeout, Watcher watcher, long, byte[], boolean)

同步创建节点:

@Test
public void createTest() throws KeeperException, InterruptedException 
    String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    log.info("created path: ",path);

异步创建节点:

@Test
public void createAsycTest() throws InterruptedException 
     zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT,
             (rc, path, ctx, name) -> log.info("rc  ,path ,ctx ,name ",rc,path,ctx,name),"context");
    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

修改节点数据

@Test
public void setTest() throws KeeperException, InterruptedException 

    Stat stat = new Stat();
    byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
    log.info("修改前: ",new String(data));
    zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
     byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
    log.info("修改后: ",new String(dataAfter));

什么是 Curator

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper 处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性。

Curator 实战

这里我们以 Maven 工程为例,首先要引入Curator 框架相关的开发包,这里为了方便测试引入了junit ,lombok,由于Zookeeper本身以来了 log4j 日志框架,所以这里可以创建对应的log4j配置文件后直接使用。 如下面的代码所示,我们通过将 Curator 相关的引用包配置到 Maven 工程的 pom 文件中,将 Curaotr 框架引用到工程项目里,在配置文件中分别引用了两个 Curator 相关的包,第一个是 curator-framework 包,该包是对 ZooKeeper 底层 API 的一些封装。另一个是 curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.8</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.8</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

会话创建

要进行客户端服务器交互,第一步就要创建会话
Curator 提供了多种方式创建会话,比如用静态工厂方式创建:

// 重试策略 
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

或者使用 fluent 风格创建

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.128.129:2181")
                .sessionTimeoutMs(5000)  // 会话超时时间
                .connectionTimeoutMs(5000) // 连接超时时间
                .retryPolicy(retryPolicy)
                .namespace("base") // 包含隔离名称
                .build();
client.start();

这段代码的编码风格采用了流式方式,最核心的类是 CuratorFramework 类,该类的作用是定义一个 ZooKeeper 客户端对象,并在之后的上下文中使用。在定义 CuratorFramework 对象实例的时候,我们使用了 CuratorFrameworkFactory 工厂方法,并指定了 connectionString 服务器地址列表、retryPolicy 重试策略 、sessionTimeoutMs 会话超时时间、connectionTimeoutMs 会话创建超时时间。下面我们分别对这几个参数进行讲解:

connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。

retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。


超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

创建节点

创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点的类型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。

 @Test
public void testCreate() throws Exception 
    String path = curatorFramework.create().forPath("/curator-node");
    // curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())
    log.info("curator create node :  successfully.",path);

在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。

一次性创建带层级结构的节点

@Test
public void testCreateWithParent() throws Exception 
    String pathWithParent="/node-parent/sub-node-1";
    String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
    log.info("curator create node :  successfully.",path);

获取数据

@Test
public void testGetData() throws Exception 
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
    log.info("get data from  node :  successfully.",new String(bytes));

更新节点

我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。

@Test
public void testSetData() throws Exception 
    curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
    log.info("get data from  node /curator-node :  successfully.",new String(bytes));

删除节点

@Test
public void testDelete() throws Exception 
    String pathWithParent="/node-parent";
    curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);

guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。

deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。

异步接口
Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

public interface BackgroundCallback

    /**
     * Called when the async background operation completes
     *
     * @param client the client
     * @param event operation result details
     * @throws Exception errors
     */
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;

如上接口,主要参数为 client 客户端, 和 服务端事件 event
inBackground 异步处理默认在EventThread中执行

@Test
public void test() throws Exception 
    curatorFramework.getData().inBackground((item1, item2) -> 
        log.info(" background: ", item2);
    ).forPath(ZK_NODE);

    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

指定线程池

@Test
public void test() throws Exception 
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    curatorFramework.getData().inBackground((item1, item2) -> 
        log.info(" background: ", item2);
    ,executorService).forPath(ZK_NODE);

    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

Curator 监听器

/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener

    /**
     * Called when a background task has completed or a watch has triggered
     *
     * @param client client
     * @param event the event
     * @throws Exception any errors
     */
    public void         eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;

针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听

Curator Caches
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。

node cache
NodeCache 对某一个节点进行监听

public NodeCache(CuratorFramework client,
                         String path)
Parameters:
client - the client
path - path to cache

可以通过注册监听器来实现,对当前节点数据变化的处理

public void addListener(NodeCacheListener listener)
     Add a change listener
Parameters:
listener - the listener
@Slf4j
public class NodeCacheTest extends AbstractCuratorTest

    public static final String NODE_CACHE="/node-cache";

    @Test
    public void testNodeCacheTest() throws Exception 

        createIfNeed(NODE_CACHE);
        NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
        nodeCache.getListenable().addListener(new NodeCacheListener() 
            @Override
            public void nodeChanged() throws Exception 
                log.info(" path nodeChanged: ",NODE_CACHE);
                printNodeData();
            
        );

        nodeCache.start();
    


    public void printNodeData() throws Exception 
        byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
        log.info("data: ",new String(bytes));
    

path cache
PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听,

public PathChildrenCache(CuratorFramework client,
                         String path,
                         boolean cacheData)
Parameters:
client - the client
path - path to watch
cacheData - if true, node contents are cached in addition to the stat

可以通过注册监听器来实现,对当前节点的子节点数据变化的处理

public void addListener(PathChildrenCacheListener listener)
     Add a change listener
Parameters:
listener - the listener
@Slf4j
public class PathCacheTest extends AbstractCuratorTest

    public static final String PATH="/path-cache";

    @Test
    public void testPathCache() throws Exception 

        createIfNeed(PATH);
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() 
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception 
                log.info("event:  ",event);
            
        );

        // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
        pathChildrenCache.start(true);
    

tree cache:
TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。

public TreeCache(CuratorFramework client,
                         String path,
                         boolean cacheData)
Parameters:
client - the client
path - path to watch
cacheData - if true, node contents are cached in addition to the stat

可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理

public void addListener(TreeCacheListener listener)
     Add a change listener
Parameters:
listener - the listener

@Slf4j
public class TreeCacheTest extends AbstractCuratorTest

    public static final String TREE_CACHE="/tree-path";

    @Test
    public void testTreeCache() throws Exception 
        createIfNeed(TREE_CACHE);
        TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
        treeCache.getListenable().addListener(new TreeCacheListener() 
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception 
                log.info(" tree cache: ",event);
            
        );
        treeCache.start();
    

2.Zookeeper 集群模式

Zookeeper 集群模式一共有三种类型的角色

Leader: 处理所有的事务请求(写请求),可以处理读请求,集群中只能有一个Leader
Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower节点要参与到新的Leader选举中,有可能成为新的Leader节点。
Observer:只能处理读请求。不能参与选举

Zookeeper集群模式安装

本例搭建的是伪集群模式,即一台机器上启动四个zookeeper实例组成集群,真正的集群模式无非就是实例IP地址不同,搭建方法没有区别
Step1:配置JAVA环境,检验环境:保证是jdk7 及以上即可

java -version

Step2:下载并解压zookeeper

wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
cd  apache-zookeeper-3.5.8-bin

Step3:重命名 zoo_sample.cfg文件

 cp conf/zoo_sample.cfg conf/zoo-1.cfg

Step4:修改配置文件zoo-1.cfg,原配置文件里有的,修改成下面的值,没有的则加上

# vim conf/zoo-1.cfg
dataDir=/usr/local/data/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2001:3001:participant// participant 可以不用写,默认就是participant
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer

配置说明

  • tickTime:用于配置Zookeeper中最小时间单位的长度,很多运行时的时间间隔都是使用tickTime的倍数来表示的。
  • initLimit:该参数用于配置Leader服务器等待Follower启动,并完成数据同步的时间。Follower服务器再启动过程中,会与Leader建立连接并完成数据的同步,从而确定自己对外提供服务的起始状态。Leader服务器允许Follower再initLimit 时间内完成这个工作。
  • syncLimit:Leader 与Follower心跳检测的最大延时时间
  • dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
  • clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
  • server.A=B:C:D:E 其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。如果需要通过添加不参与集群选举以及事务请求的过半机制的 Observer节点,可以在E的位置,添加observer标识。
    Step4:再从zoo-1.cfg复制三个配置文件zoo-2.cfg,zoo-3.cfg和zoo-4.cfg,只需修改dataDir和clientPort不同即可
cp conf/zoo1.cfg conf/zoo2.cfg
cp conf/zoo1.cfg conf/zoo3.cfg
cp conf/zoo1.cfg conf/zoo4.cfg
vim conf/zoo2.cfg
dataDir=/usr/local/data/zookeeper2
clientPort=2182
vim conf/zoo3.cfg
dataDir=/usr/local/data/zookeeper3
clientPort=2183
vim conf/zoo4.cfg
dataDir=/usr/local/data/zookeeper4
clientPort=2184

Step5:标识Server ID
创建四个文件夹/usr/local/data/zookeeper-1,/usr/local/data/zookeeper-2,/usr/local/data/zookeeper-3,/usr/local/data/zookeeper-4,在每个目录中创建文件myid 文件,写入当前实例的server id,即1,2,3,4

 cd /usr/local/data/zookeeper-1
 vim myid
 1 
 cd /usr/local/data/zookeeper-2
 vim myid
 2 
 cd /usr/local/data/zookeeper-3
 vim myid
 3 
cd /usr/local/data/zookeeper-4
vim myid
4

Step6:启动三个zookeeper实例

bin/zkServer.sh start conf/zoo1.cfg
bin/zkServer.sh start conf/zoo2.cfg
bin/zkServer.sh start conf/zoo3.cfg

Step7:检测集群状态,也可以直接用命令 zkServer.sh status conf/zoo1.cfg 进行每台服务的状态查询

bin/zkCli.sh -server ip1:port1,ip2:port2,ip3:port3 

可以通过 查看/zookeeper/config 节点数据来查看集群配置

Zookeeper 3.5.0 新特性: 集群动态配置

Zookeeper 3.5.0 以前,Zookeeper集群角色要发生改变的话,只能通过停掉所有的Zookeeper服务,修改集群配置,重启服务来完成,这样集群服务将有一段不可用的状态,为了应对高可用需求,Zookeeper 3.5.0 提供了支持动态扩容/缩容的 新特性。但是通过客户端API可以变更服务端集群状态是件很危险的事情,所以在zookeeper 3.5.3 版本要用动态配置,需要开启超级管理员身份验证模式 ACLs。如果是在一个安全的环境也可以通过配置 系统参数 -Dzookeeper.skipACL=yes 来避免配置维护acl 权限配置。

第一步,按照上节课的方式,先配置一个超级管理员(如果不配管理员,也可以设置系统参数 -Dzookeeper.skipACL=yes):如:

在zookeeper启动脚本中添加 超级管理员授权模式:

echo -n gj:123 | openssl dgst -binary -sha1 | openssl base64 
// RRCKWv2U2e99M6UmsFaJiQ2xStw=

-Dzookeeper.DigestAuthenticationProvider.superDigest=gj:RRCKWv2U2e99M6UmsFaJiQ2xStw=

配置动态文件

修改配置 zoo1.cfg
注意这里去除了端口号,添加了
reconfigEnabled : 设置为true 开启动态配置
dynamicConfigFile : 指定动态配置文件的路径


创建文件 zoo_replicated1.cfg.dynamic

动态配置文件,加入集群信息
server.A=B.C.D.E;F
A: 服务的唯一标识
B: 服务对应的IP地址,
C: 集群通信端口
D: 集群选举端口
E: 角色, 默认是 participant,即参与过半机制的角色,选举,事务请求过半提交,还有一个是observer, 观察者,不参与选举以及过半机制。
之后是一个分号,一定是分号,
F:服务IP:端口

server.1=192.168.109.200:2001:3001:participant;192.168.109.200:2181
server.2=192.168.109.200:2002:3002:participant;192.168.109.200:2182
server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183

依次配置其他服务 zoo2.cfg ,zoo3.cfg注意数据文件的路径

依次启动所有服务
如: ./bin/zkServer.sh  start   conf/zoo1.cfg  
查看集群状态:
     ./bin/zkServer.sh  status  conf/zoo1.cfg  

连上任意一台服务器:

 查看集群配置
 config  // 将会把动态配置打印出来
 也可以直接查看目录
 /zookeeper/config 
 该节点存储了集群信息

 如果要修改集群状态,需要授权登录
 addauth digest gj:123  

 reconfig -remove 3  // 移除serverId为 3 的机器
 // 把对应的机器加进来
 reconfig -add server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183

如果要变更/或者添加新的服务需要将服务加到配置文件 zoo_replicated1.cfg.dynamic 中,启动服务
然后通过reconfig 命令进行添加或者变更服务角色,但是需要保证服务列表中 participant 角色能够形成集群(过半机制)。

客户端可以通过监听 /zookeeper/confg 节点,来感知集群的变化。从而实现集群的动态变更.
Zookeeper 类提供了对应的API 用来更新服务列表 : updateServerList
(完整的工程代码,在课程对应的资料包中)

 Watcher watcher = new Watcher() 
    @Override
    public void process(WatchedEvent event) 
                 if (event.getType() == Event.EventType.None
                         && event.getState() == Event.KeeperState.SyncConnected)
                           countDownLatch.countDown();
                           log.info(" 连接建立");
                           // start to watch config
                     try 
                         log.info(" 开始监听:",ZooDefs.CONFIG_NODE);
                         zookeeper.getConfig(true,null);
                      catch (KeeperException e) 
                         e.printStackTrace();
                      catch (InterruptedException e) 
                         e.printStackTrace();
                     
                 else if( event.getPath()!=null  &&  event.getPath().equals(ZooDefs.CONFIG_NODE))
                     try 
                         byte[] config = zookeeper.getConfig(this, null);
                         String clientConfigStr = ConfigUtils.getClientConfigStr(new String(config));
                         log.info(" 配置发生变更: ",clientConfigStr);
                         zookeeper.updateServerList(clientConfigStr.split(" ")[1]);
                      catch (KeeperException e) 
                         e.printStackTrace();
                      catch (InterruptedException e) 
                         e.printStackTrace();
                      catch (IOException e) 
                         e.printStackTrace();
                     

                 
    
;

Curator 也自带了动态配置的监听,不需要额外的配置和代码实现监听更新;

3.Zookeeper典型使用场景实战

Zookeeper分布式锁实战

Zookeeper 分布式锁加锁原理

如上实现方式在并发问题比较严重的情况下,性能会下降的比较厉害,主要原因是,所有的连接都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争,这就是羊群效应。这种加锁方式是非公平锁的具体实现:如何避免呢,我们看下面这种方式。


如上借助于临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。这种实现方式所有加锁请求都进行排队加锁,是公平锁的具体实现。

前面这两种加锁方式有一个共同的特质,就是都是互斥锁,同一时间只能有一个请求占用,如果是大量的并发上来,性能是会急剧下降的,所有的请求都得加锁,那是不是真的所有的请求都需要加锁呢?答案是否定的,比如如果数据没有进行任何修改的话,是不需要加锁的,但是如果读数据的请求还没读完,这个时候来了一个写请求,怎么办呢?有人已经在读数据了,这个时候是不能写数据的,不然数据就不正确了。直到前面读锁全部释放掉以后,写请求才能执行,所以需要给这个读请求加一个标识(读锁),让写请求知道,这个时候是不能修改数据的。不然数据就不一致了。如果已经有人在写数据了,再来一个请求写数据,也是不允许的,这样也会导致数据的不一致,所以所有的写请求,都需要加一个写锁,是为了避免同时对共享数据进行写操作。

举个例子

1、读写并发不一致

2、双写不一致情况

Zookeeper 共享锁实现原理

注册中心实战

注册中心场景分析:

  1. 在分布式服务体系结构比较简单的场景下,我们的服务可能是这样的

    现在 Order-Service 需要调用外部服务的 User-Service ,对于外部的服务依赖,我们直接配置在我们的服务配置文件中,在服务调用关系比较简单的场景,是完全OK的。随着服务的扩张,User-Service 可能需要进行集群部署,如下:

    如果系统的调用不是很复杂,可以通过配置管理,然后实现一个简单的客户端负载均衡也是OK的,但是随着业务的发展,服务模块进行更加细粒度的划分,业务也变得更加复杂,再使用简单的配置文件管理,将变得难以维护。当然我们可以再前面加一个服务代理,比如nginx做反向代理, 如下:

    如果我们是如下场景呢?

服务不再是A-B,B-C 那么简单,而是错综复杂的微小服务的调用

这个时候我们可以借助于Zookeeper的基本特性来实现一个注册中心,什么是注册中心,顾名思义,就是让众多的服务,都在Zookeeper中进行注册,啥是注册,注册就是把自己的一些服务信息,比如IP,端口,还有一些更加具体的服务信息,都写到 Zookeeper节点上, 这样有需要的服务就可以直接从zookeeper上面去拿,怎么拿呢? 这时我们可以定义统一的名称,比如,User-Service, 那所有的用户服务在启动的时候,都在User-Service 这个节点下面创建一个子节点(临时节点)&#x

以上是关于Zookeeper的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper的ZAB协议的原理以及底层源码实现超级详解

一步到位分布式开发Zookeeper实现集群管理

Windows 8提升普通管理员权限为超级管理员权限以及激活超级管理员Administrator

zookeeper源码之临时节点管理

Zookeeper在正常配置下启动成功,但是在查看状态时仍然会报错的解决方法(超级详细保姆级讲解)

zookeeper怎么配置步骤