Apache ZooKeeper

Posted xue_yun_xiang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache ZooKeeper相关的知识,希望对你有一定的参考价值。

一、概述

在分布式环境下,如果舍弃SpringCloud,使用其他的分布式框架,那么注册心中,配置集中管理,集群管理,分布式锁,队列的管理想单独实现怎么办。

Zookeeper本身是Hadoop生态园的中的一个组件,Zookeeper强大的功能,在Java分布式架构中,也会频繁的使用到Zookeeper。Zookeeper就是一个文件系统 + 监听通知机制

存储结构

znode 就是一个节点 ,znode 可以存储数据,还可以存储 子节点
​ 相当于一个文件夹,这文件夹可以存数据,还可以内部存储子文件夹


节点类型

永久节点:只要不删除一致存在
临时节点:只在客户端连接时,节点存在 ,一旦断开,节点自动删除
有序节点:节点是有顺序的,从0000001 0000002.。。。。 向上加 有序的

组合:

永久有序,临时有序

二、安装

docker-compose.yml

version: “3.1”
services:
zk:
image: daocloud.io/daocloud/zookeeper:latest
restart: always
container_name: zk
ports:
- 2181:2181

启动

docker-compose up

进入zk 命令

[root@mastera docker-compose-zk1]# docker exec -it zk bash #进入容器内部
root@a418197ed774:/opt/zookeeper# cd bin/
root@a418197ed774:/opt/zookeeper/bin# ls
root@a418197ed774:/opt/zookeeper/bin# ./zkCli.sh #进入zk 命令行


查看zk 节点列表

[zk: localhost:2181(CONNECTED) 4] ls /
[xiaoming, zookeeper]

创建节点:create 路径 数据

create /xiaoming xiaoming

获取节点中的数据

[zk: localhost:2181(CONNECTED) 5] get /xiaoming
xiaoming

修改节点中的数据

[zk: localhost:2181(CONNECTED) 6] set /xiaoming xiaoming_v1

delete 只能删除末端节点 ,如果节点中有字节点,无法删除

[zk: localhost:2181(CONNECTED) 9] delete /xiaoming
[zk: localhost:2181(CONNECTED) 16] delete /znode1
Node not empty: /znode1

强制删除所有子节点

[zk: localhost:2181(CONNECTED) 17] rmr /znode1

退出zk命令行

[zk: localhost:2181(CONNECTED) 20] quit

为什么会有 临时节点?
如果我们把zk 中的节点当成一个 锁(标记),是有某一个客户端创建,如果存在,客户端持有锁,假如客户端挂了,断开连接 ,节点会自动删除, 可以利用此特性 防止死锁

有序节点 ?
可以作为队列使用保证消息的顺序性

[zk: localhost:2181(CONNECTED) 4] create -s /java b -s sequence 创建有序节点 节点是以java 开头
[zk: localhost:2181(CONNECTED) 6] create -e /html div -e ephemeral 临时节点 只在当前连接存在 ,只要退出当前客户端 ,立即自动删除
[zk: localhost:2181(CONNECTED) 1] create -e -s /php php 创建临时有序节点

三、zk 集群模式

zk 集群模式 :至少三台机器,机器数量是奇数个, 如果机器可用的数量小于一半,则真个集群不能使用,无法提供服务( 3/2 =1.5 挂两台 剩余一台可用 ,小于1.5,此时集群不可用,因为无法发起选举机制)


集群中 节点的状态/角色

Leader: 领导者 就是master 节点
Follower :跟随者 也就是从节点 slave
Observer: 观察者 也是从节点 ,比较特殊 不参与主节点选举,只提供读操作
Looking:是一个临时转态,在集群 选举master 过程中,所有的Follower 节点都会去竞选master ,此时会将自己的状态改为 Looking

四、搭建zk集群

1、关闭单机的zk

2、配置

[root@mastera java2102]# mkdir docker-compose-zk2
[root@mastera java2102]# cd docker-compose-zk2
[root@mastera docker-compose-zk2]# vim docker-compose.yaml

version: “3.1”
services:
zk1:
image: zookeeper
restart: always
container_name: zk1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
zk2:
image: zookeeper
restart: always
container_name: zk2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
zk3:
image: zookeeper
restart: always
container_name: zk3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181

启动

docker-compose up

检查zk 节点状态

检查zk1

检查zk2

检查zk3

五、zk集群的选举机制

选举机制,投票,谁得到的票数最多,谁就是主节点。

什么情况下需要投票?

  • 1.主节点 挂了,群龙无首 需要选举
  • 2.在集群启动时 也需要 选举主节点 三台zk集群,只需两台就可以作为集群使用
  • 3.集群中的Follower数量不足以通过半数检验,Leader会挂掉自己,选举新leader
  • 4.新加入新的子节点

在集群运行过程中 master 节点挂掉的选举机制

1.每个人首先给自己都一票
2.每个人都会知道 其他队友的 myid zxid
3.每个人会将自己的一票 投给 zxid 最大的那个人


在集群初始化启动时的选举机制


六、zk java api操作

在实际开发过程中 zk 官方提供的api,极其难用,尤其是在监听事件中,所以 我们使用第三方封装的接口curator (类似mybatis )

1、创建 java 工程引入依赖

<dependencies>

    <!--  zk 依赖-->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.6.0</version>
    </dependency>

    <!--zk  封装  -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.1</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

2、创建工具类

package com.qfedu;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author xue_yun_xiang
 * @create 2021-06-24-16:05
 */
public class ZkUtils 


    //封装了zkClient
    public static CuratorFramework creatCuratorFramework() 

        //客户端重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 2);

        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.42.128:2181,192.168.42.128:2182,192.168.42.128:2183", retryPolicy);

        //发起连接
        curatorFramework.start();

        return curatorFramework;
    


3、测试

package com.qfedu;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

/**
 * @author xue_yun_xiang
 * @create 2021-06-24-16:11
 */


public class ZkTest 

    private CuratorFramework curatorFramework;

    @Before
    public void init()
        curatorFramework = ZkUtils.creatCuratorFramework();
    

    @Test
    //获取节点列表
    public void list() throws Exception 
        List<String> list = curatorFramework.getChildren().forPath("/");

        for (String node : list) 
            System.out.println("根路径下的节点node = " + node);
        
    

    @Test
    //获取节点数据
    public void getData() throws Exception 
        byte[] bytes = curatorFramework.getData().forPath("/php");
        System.out.println(new String(bytes,"utf-8"));
    

    //增加节点
    @Test
    public void createNode() throws Exception 

        String node = curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/php","php是世界上最好的语言".getBytes());

        System.out.println(node);
    

    //修改节点
    @Test
    public void update() throws Exception 
        curatorFramework.setData().forPath("/php","php不行了,转java吧吧吧吧吧".getBytes());
        System.out.println("修改成功");
    
    //删除节点
    @Test
    public void delete() throws Exception 
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/php");
        System.out.println("删除成功");
    

    //查看znode的状态
    @Test
    public void stat() throws Exception 
        Stat stat = curatorFramework.checkExists().forPath("/php");
        System.out.println(stat);
    



/**
 * 监听:  时间 需要 多等 一会 30s
 *
 * 最大的使用场景 就是使用zk 作为配置文件中心
 *
 * 有一个小米公司开发的 zk 配置中心控制台
 *
 **/
    @Test
    public void listenTest() throws Exception 
        // 具有监听功能的客户端
        final NodeCache nodeCache = new NodeCache(curatorFramework, "/php");
        //开始监听
        nodeCache.start();

        nodeCache.getListenable().addListener(new NodeCacheListener() 
            public void nodeChanged() throws Exception 
                //可以得到节点的数据变化
                byte[] bytes = nodeCache.getCurrentData().getData();

                System.out.println("变化后的数据:" + new String(bytes,"utf-8"));
                System.out.println("nodeCache = " + nodeCache.getCurrentData().getStat());
            
        );

        System.out.println("**********");
        System.in.read();

    


以上是关于Apache ZooKeeper的主要内容,如果未能解决你的问题,请参考以下文章

详解Apache Pulsar的Topic绑定Broker

Zookeeper开源客户端Curator之基本功能讲解

Zookeeper3.5.7版本——选举机制(第一次启动时)

Zookeeper基于API操作Node节点

Zookeeper基于API操作Node节点

Apache Kafka 删除 Apache ZooKeeper 的依赖