SpringCloud Alibaba系列一文全面解析Zookeeper安装常用命令JavaAPI操作Watch事件监听分布式锁集群搭建核心理论

Posted 蓝染-惣右介

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud Alibaba系列一文全面解析Zookeeper安装常用命令JavaAPI操作Watch事件监听分布式锁集群搭建核心理论相关的知识,希望对你有一定的参考价值。

文章目录


Zookeeper


学习目标

  • Zookeeper简介
  • ZooKeeper安装与配置
  • ZooKeeper命令操作
  • ZooKeeperJavaAPI操作
  • ZooKeeper集群搭建
  • Zookeeper核心理论

一、简介

Zookeeper是 Apache Hadoop项目下的一个子项目,是一个树形目录服务。
Zookeeper翻译过来就是动物园管理员,他是用来管Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员。简称ZK。

Zookeeper是一个分布式的、开源的分布式应用程序的协调服务。
官网:https://zookeeper.apache.org/


二、应用场景

Zookeeper是一个经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调存储服务。

Zookeeper提供的主要功能包括:

  • 作为配置中心维护配置信息
  • 提供分布式锁服务
  • 作为注册中心实现集群管理
  • 生成分布式唯一ID
  1. 配置管理


java编程经常会遇到配置项,比如数据库的url、 schema、user和 password等。通常这些配置项我们会放置在配置文件中,再将配置文件放置在服务器上当需要更改配置项时,需要去服务器上修改对应的配置文件。
但是随着分布式系统的兴起,由于许多服务都需要使用到该配置文件,因此有必须保证该配置服务的高可用性(highavailability)和各台服务器上配置数据的一致性。
通常会将配置文件部署在一个集群上,然而一个集群动辄上千台服务器,此时如果再一台台服务器逐个修改配置文件那将是非常繁琐且危险的的操作,因此就需要一种服务能够高效快速且可靠地完成配置项的更改等操作,并能够保证各配置项在每台服务器上的数据一致性。
zookeeper就可以提供这样一种服务,其使用Zab这种一致性协议来保证一致性。现在有很多开源项目使用zookeeper来维护配置,如在 hbase中,客户端就是连接一个 zookeeper,获得必要的 hbase集群的配置信息,然后才可以进一步操作。还有在开源的消息队列 kafka中,也便用zookeeper来维护 brokers的信息。在 alibaba开源的soa框架dubbo中也广泛的使用zookeeper管理一些配置来实现服务治理。

  1. 分布式锁


一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性,多台服务器上运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作进行加锁,如果当前机器挂掉后,释放锁并 fail over到其他的机器继续执行该服务。

  1. 集群管理


一个集群有时会因为各种软硬件故障或者网络故障,出现棊些服务器挂掉而被移除集群,而某些服务器加入到集群中的情况,zookeeper会将这些服务器加入/移出的情况通知给集群中的其他正常工作的服务器,以及时调整存储和计算等任务的分配和执行等。此外zookeeper还会对故障的服务器做出诊断并尝试修复。

  1. 生成分布式唯一ID

在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。
但是分库分表后,就无法在依靠数据库的auto_increment属性来唯一标识一条记录了。此时我们就可以用Zookeeper在分布式环境下生成全局唯一ID。
做法如下:每次要生成一个新id时,创建一个持久顺序节点,创建操作返回的节点序号,即为新id,然后把比自己节点小的删除即可。


三、设计目标

Zookeeper致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务。

  1. 高性能

Zookeeper将全量数据存储在内存中,并直接服务于客户端的所有非事务请求,尤其用于以读为主的应用场景。

  1. 高可用

Zookeeper一般以集群的方式对外提供服务,一般3~5台机器就可以组成一个可用的 Zookeeper集群了,每台机器都会在内存中维护当前的服务器状态,井且每台机器之间都相互保持着通信。只要集群中超过一半的机器都能够正常工作,那么整个集群就能够正常对外服务。

  1. 严格顺序访问

对于来自客户端的每个更新请求,Zookeeper都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序。


四、数据模型

ZooKeeper是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。这里面的每一个节点都被称为:ZNode,每个节点上都会保存自己的数据和节点信息。
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:

  • PERSISTENT持久化节点
  • EPHEMERAL 临时节点:-e
  • PERSISTENT_SEQUENTIAL持久化顺序节点:-s
  • EPHEMERAL_SEQUENTIAL临时顺序节点:-es


那么如何描述一个znode呢?一个znode大体上分为3个部分:

  • 结点的数据:即znode data(结点path,结点data)的关系就像是Java map中的 key value关系。
  • 结点的子结点children
  • 结点的状态stat:用来描述当前结点的创建、修改记录,包括cZxidctime等。

结点类型
zookeeper中的结点有两种,分别为临时结点永久结点。结点的类型在创建时被确定,并且不能改变

  • 临时节点:
    • 该节点的生命周期依赖于创建它们的会话。一旦会话( Session)结束,临时节点将被自动删除,当然可以也可以手动删除。虽然每个临时的 Znode都会绑定到一个客户端会话,但他们对所有的客户端还是可见的。另外,Zookeeper的临时节点不允许拥有子节点。
  • 持久化结点:
    • 该结点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,它们才能被删除。

五、单机安装

  • 测试系统环境CentOS7.9
  • zookeeper:apache-zookeeper-3.5.6-bin.tar.gz
  • jdk(v1.8及以上):jdk-8u171-linux-x64.tar.gz

下载地址:http://archive.apache.org/dist/zookeeper/

  1. zookeeper底层依赖于jdk,它运行在JVM之上。根目录下先进行jdk的安装(已安装可跳过),jdk使用 jdk-8u171-linux-x64.tar.gz
tar -zxvf jdk-8u171-linux-x64.tar.gz
  1. 配置jdk环境变量。
vim /etc/profile
JAVA_HOME=/usr/local/jdk1.8.0_171
export JAVA_HOME

PATH=$JAVA_HOME/bin:$PATH
export PATH

source /etc/profile
  1. 检测jdk安装java -version,如果反馈了Java信息,则成功。
  2. 将下载的ZooKeeper放到/opt/zookeeper目录下。
# 上传zookeeper安装包到服务器
put f:/setup/apache-zookeeper-3.5.6-bin.tar.gz
# 在/opt目录下创建zookeeper目录
mkdir /opt/zookeeper
# 将zookeeper安装包移动到 /opt/zookeeper
mv apache-zookeeper-3.5.6-bin.tar.gz /opt/zookeeper/
  1. 将tar包解压到/opt/zookeeper目录下。
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /opt/zookeeper/
  1. 为zookeeper准备配置文件zoo.cfg,进入到conf目录拷贝一个zoo_sample.cfg并完成配置,修改zoo.cfg。
# 进入到conf目录
cd /opt/zookeeper/apache-zookeeper-3.5.6-bin/conf/
# 拷贝 (配置文件必须叫zoo.cfg才能生效)
cp zoo_sample.cfg zoo.cfg
# 打开目录
cd /opt/zookeeper/
# 创建zookeeper存储目录
mkdir zkdata
# 修改zoo.cfg
vim /opt/zookeeper/apache-zookeeper-3.5.6-bin/conf/zoo.cfg
# 修改存储目录
将zk默认存储数据的临时目录dataDir=/tmp/zookeeper,修改为:dataDir=/opt/zookeeper/zkdata

  1. 2181是ZK监听客户端连接的默认端口号,记得防火墙把这个端口给开放出来,不然客户端连接不上。
# 添加2181端口到防火墙允许列表
firewall-cmd --zone=public --add-port=2181/tcp --permanent
# 立即生效
firewall-cmd --reload
# 查看Linux防火墙允许通过的端口号
firewall-cmd --zone=public --list-ports
  1. 启动zookeeper
# 进入zookeeper的bin目录
cd /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/
# 启动zookeeper
./zkServer.sh start

# 查看ZooKeeper状态
#./zkServer.sh status
# 停止ZooKeeper
#./zkServer.sh stop
# 开启ZooKeeper
#./zkServer.sh start
# 重启ZooKeeper
#./zkServer.sh restart


# 进入zookeeper客户端内部
./zkCli.sh

zookeeper单节点standalone启动成功。


六、命令操作

1. 服务端常用命令

  • 启动ZooKeeper服务:./zkServer.sh start
  • 查看ZooKeeper服务状态:./zkServer.sh status
  • 停止ZooKeeper服务:./zkServer.sh stop
  • 重启ZooKeeper服务:./zkServer.sh restart

2. 客户端常用命令

  1. ./zkCli.sh -server ip:portZookeeper客户端连接服务端。
# Zookeeper客户端连接服务端,-server指定ip:port
./zkCli.sh -server localhost:2181
# 如果登录连接本机,可以省略不写
./zkCli.sh
  1. quit退出客户端,断开连接。
  2. ls 结点路径查看该结点下的子结点。

  1. create 结点目录 结点数据根据路径创建该结点,并指定该结点数据,如果未指定该结点数据,则为null;若相同结点路径已经存在,则不能创建相同结点路径。
  2. get 结点路径查看该结点的数据。
  3. set 结点路径 结点数据为该结点设置数据。
  4. delete 结点路径删除该结点,如果该结点下存在子结点,则无法直接删除多层结点。
  5. deleteall 多层结点路径删除带有子结点的结点。


3. 创建临时顺序结点

  1. create -e 结点路径 结点数据创建临时结点,只在当前会话窗口内有效,quit断开连接后无效。
  2. create -s 结点路径 结点数据创建顺序结点,该结点后面会自动加上一个数字编号,所有的结点都使用同一套数字编号。
  3. ls -s 结点路径ls2 结点路径查询结点详细信息。

czxid:节点被创建的事务ID。
dataversion:数据版本号。
ctime:创建时间。
aclversion:权限版本号。
mzxid:最后一次被更新的事务ID。
ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0。
mtime:修改时间。
pzxid:子节点列表最后一次被更新的事务ID。
dataLength:节点存储的数据的长度。
cversion:子节点的版本号。
numChildren:当前节点的子节点个数。


七、JavaAPI操作

1. Curator介绍

Curator是 Apache ZooKeeper 的Java客户端库。常见的ZooKeeper Java APl:

  • 原生Java APl
  • ZkClient
  • Curator

Curator项目的目标是简化ZooKeeper客户端的使用。
Curator最初是Netfix研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。
官网:http://curator.apache.org/(zk版本3.5+,curator版本需要用4.0+)

  • 建立连接
  • 添加节点
  • 删除节点
  • 修改节点
  • 查询节点
  • Watch事件监听
  • 分布式锁实现

2. 建立连接

相关方法解释说明
static CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)第一种建立客户端连接的方式,通过方法参数设置连接信息。
CuratorFrameworkFactory.builder().connectString(String connectString).sessionTimeoutMs(int sessionTimeoutMs).connectionTimeoutMs(int connectionTimeoutMs).retryPolicy(RetryPolicy retryPolicy).namespace(String path).build();第二种建立客户端连接的方式,通过链式编程设置连接信息。

1)创建一个空项目zk-pro,里面创建一个maven项目curator-zk

2)导入pom.xmllog4j.properties

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itheima</groupId>
    <artifactId>curator-zk</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
        <!-- curator -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
log4j.rootLogger=off,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%dyyyy-MM-dd HH/:mm/:ss]%-5p %c(line/:%L) %x-%m%n

3)创建测试类CuratorTest,编写测试建立连接方法。

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;

public class CuratorTest 
    /**
     * 建立客户端连接 Create a new client
     */
    @Test
    public void testConnect() 
        /*
            第一种方式:
            static CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
            @param connectString       连接字符串,zk server的地址和端口 "192.168.8.100:2181,192.168.8.101:2181,..."
            @param sessionTimeoutMs    会话超时时间,单位ms
            @param connectionTimeoutMs 连接超时时间,单位ms
            @param retryPolicy         重试策略
            @return client
         */
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); // 3秒重试一次,最多重试10次
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.8.100:2181", 60 * 1000, 15 * 1000, retryPolicy);
        // 开启连接
        client.start();
        /*
            第二种方式:链式编程
            static CuratorFrameworkFactory.builder()
            @param connectString       连接字符串,zk server的地址和端口 "192.168.8.100:2181,192.168.8.101:2181,..."
            @param sessionTimeoutMs    会话超时时间,单位ms
            @param connectionTimeoutMs 连接超时时间,单位ms
            @param retryPolicy         重试策略
            @return client
         */
        CuratorFramework client2 = CuratorFrameworkFactory.builder().connectString("192.168.8.100:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(new ExponentialBackoffRetry(3000, 10))
                .namespace("itheima").build();
        // 开启连接
        client2.start();
    


源码解读
Ctrl+左键点击newClient()静态方法跟进查看。

/**
* Create a new client
*
* @param connectString       list of servers to connect to
* @param sessionTimeoutMs    session timeout
* @param connectionTimeoutMs connection timeout
* @param retryPolicy         retry policy to use
* @return client
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

	return builder().
    connectString(connectString).
    sessionTimeoutMs(sessionTimeoutMs).
    connectionTimeoutMs(connectionTimeoutMs).
    retryPolicy(retryPolicy).
    build();

方法内部其实是调用了builder()方法做了链式编程,该方法参数有4个,分别是:

方法参数解释说明
String connectString连接字符串,zk server的地址和端口,可集群填写"192.168.8.100:2181,192.168.8.101:2181,…"
int sessionTimeoutMs会话超时时间,单位ms
int connectionTimeoutMs连接超时时间,单位ms
RetryPolicy retryPolicy重试策略

对于重试策略RetryPolicy,继续跟进查看源码,发现这其实是一个接口。

/**
 * Abstracts the policy to use when retrying connections
 */
public interface RetryPolicy

    /**
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
     *
     *
     * @param retryCount the number of times retried so far (0 the first time)
     * @param elapsedTimeMs the elapsed time in ms since the operation was attempted
     * @param sleeper use this to sleep - DO NOT call Thread.sleep
     * @return true/false
     */
    public boolean      allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);

那么如何知道它的实现类呢?这里有个小技巧:点击其左侧的绿色接口小图标。

  • 对于接口,它会提示出该接口的实现类对象。
  • 对于接口内的抽象方法,它会提示出在哪个实现类中对该方法做了实现。

除此之外的小技巧,还有Ctrl+F12,可以查看当前类或接口的家族继承关系结构。

另外,对于第二种方式中的namespace(“xxx”)设置命名空间,这个其实是相当于给根结点设置一个根结点路径前缀/xxx,多个应用时设置名称空间起到隔离的作用,同时方便管理。

4)测试运行该测试类,发现绿了就说明测试成功,没问题!

3. 添加结点

方法参数解释说明
create().forPath(String path)基本创建结点。
create().forPath(String path, byte[] data)创建结点并设置数据。
create().withMode(CreateMode mode).forPath(String path)通过枚举类CreateMode设置结点的类型。如:持久结点,临时结点,顺序结点,可进行模式组合。
create().creatingParentsIfNeeded().forPath(String MultiLevelPath)创建多级结点,如果父节点不存在,则创建父节点。
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorTest 
    private CuratorFramework client;

    /**
     * 建立客户端连接 Create a new client
     */
    @Before
    public void testConnectClient() 
        /*
            第一种方式:
            static CuratorFrameworkFactory.newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
            @param connectString       连接字符串,zk server的地址和端口 "192.168.8.100:2181,192.168.8.101:2181,..."
            @param sessionTimeoutMs    会话超时时间,单位ms
            @param connectionTimeoutMs 连接超时时间,单位ms
            @param retryPolicy         重试策略
            @return client
         */
        // 重试策略
        /*RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); // 3秒重试一次,最多重试10次
        client = CuratorFrameworkFactory.newClient("192.168.8.100:2181", 60 * 1000, 15 * 1000, retryPolicy);
        // 开启连接
        client.start();*/
        /*
            第二种方式:链式编程
            static CuratorFrameworkFactory.builder()
            @param connectString       连接字符串,zk server的地址和端口 "192.168.8.100:2181,192.168.8.101:2181,..."
            @param sessionTimeoutMs    会话超时时间,单位ms
            @param connectionTimeoutMs 连接超时时间,单位ms
            @param retryPolicy         重试策略
            @return client
         */
        client = CuratorFrameworkFactory.builder().connectString("192.168.8.100:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(new ExponentialBackoffRetry(3000, 10))
                .namespace("itheima").build();
        // 开启连接
        client.start();
    

    /**
     * 创建节点:create (持久 临时 顺序 可进行模式组合) 数据
     * 1.基本创建                   create().forPath("/xxx");
     * 2.创建节点,带有数据          create().forPath("/xxx", dataOfBytesArray);
     * 3.设器节点的类型              create().withMode(CreateMode.XXX).forPath("/xxx");
     * 4、创建多级节点                create().creatingParentsIfNeeded().forPath("/xxx/xxx");
     */
    @Test
    public void testCreateNode1() throws Exception 
        // 1.基本创建
        String path = client.create().forPath("/app1");
        System.out.println(path);
    

    @Test
    public void testCreateNode2() throws Exception 
        // 2.创建节点,带有数据
        // 如果创建结点&#x

以上是关于SpringCloud Alibaba系列一文全面解析Zookeeper安装常用命令JavaAPI操作Watch事件监听分布式锁集群搭建核心理论的主要内容,如果未能解决你的问题,请参考以下文章

springcloud alibaba企业落地实战:一文带你掌握nacos基础应用

springcloud alibaba企业落地实战:一文带你掌握nacos基础应用

springcloud alibaba企业落地实战:一文带你掌握nacos基础应用

SpringCloud alibaba实战系列文章说明

SpringCloud alibaba实战系列文章汇总

SpringCloud-Alibaba系列教程2.搭建用户微服务模块