zookeeper学习

Posted 红豆和绿豆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper学习相关的知识,希望对你有一定的参考价值。

下载安装单机zookeeper的server

 

 

 

 

 zookeeper的操作

zookeeper的数据结构

服务端常用命令

 

客户端常用命令

 

 

临时节点,客户端关闭,服务端创建的节点则会删除

不能重复创建节点

不能删除带子节点的ZNode

javaAPI操作Zookeeper的节点操作

 

 

 

curator的增删改查连接基本操作

package curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorTest {


    private CuratorFramework curatorFramework;
    /**
     * 建立连接
     */
    @Before
    public void  testConnect(){
        //第一种方式
        /**
         * String connectString,  连接字符串 zkServer的地址和端口
         * int sessionTimeoutMs, 会话超时时间  单位为毫秒 跟zkServer会话
         * int connectionTimeoutMs, 建立连接的超时时间
         * RetryPolicy retryPolicy 重试策略 客户端连接失败的策略
         *
         */
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
        curatorFramework = CuratorFrameworkFactory.newClient("127.0.0.1:2181,27.0.0.2:2181", 60 * 1000, 15 * 1000, retryPolicy);

        //开启客户端
        curatorFramework.start();

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,27.0.0.2:2181").
                connectionTimeoutMs(15 * 1000).sessionTimeoutMs(60 * 1000).namespace("heihi")//默认是一个根节点
                .retryPolicy(retryPolicy).build();
        client.start();

    }

    //------------------------------------创建--------------------


    /**
     * 创建一个ZNode节点
     * create 节点名字  类型(永久,临时,顺序)  内容
     *
     * 基本创建
     * 设置节点内容
     * 设置节点类型
     * 创建多级节点
     */
    @Test
    public void  testCreate() throws Exception {

        /**
         * 默认创建的目录的父目录都是heihi  如果不放任何数据,会默认放一个客户端的ip地址
         */
        String forPath = curatorFramework.create().forPath("/app1");
        System.out.println(forPath);
    }

    @Test
    public void  testCreate2() throws Exception {

        String forPath = curatorFramework.create().forPath("/app2","data".getBytes());
        System.out.println(forPath);
    }

    /**
     * 创建节点的模式
     * @throws Exception
     */
    @Test
    public void  testCreate3() throws Exception {

        /**
         * 默认类型为持久化的节点 withMode
         * CreateMode 使用这个节点进行设置
         */
        String forPath = curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","data".getBytes());
        System.out.println(forPath);
    }

    /**
     * 创建多级节点
     * @throws Exception
     */
    @Test
    public void  testCreate4() throws Exception {

        /**
         * 默认类型为持久化的节点 withMode
         * CreateMode 使用这个节点进行设置
         * creatingParentsIfNeeded  允许父节点不存在创建父节点
         */
        String forPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/app4/p1","data".getBytes());
        System.out.println(forPath);
    }

    //------------------------------------查询---------------------------
    /**
     * 查询节点数据getData().forPath
     * 查询子节点 curatorFramework.getChildren().forPath
     * 查询节点的状态信息 ls -sgetData().storingStatIn(stat).forPath
     *
     * @throws Exception
     */
    @Test
    public void  testGet() throws Exception {
        //查询/app1 这个节点的数据

        byte[] bytes = curatorFramework.getData().forPath("/app1");
        System.out.println(new String(bytes));

    }


    /**
     * 查询子节点
     * @throws Exception
     */
    @Test
    public void  testGet2() throws Exception {
        //查询一个节点的所有子节点
        List<String> list = curatorFramework.getChildren().forPath("/app4");
        System.out.println(list);
    }

    /**
     * 查询节点的状态
     * @throws Exception
     */
    @Test
    public void  testGet3() throws Exception {
        Stat stat=new Stat();
        System.out.println(stat);

        byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/app1");
        System.out.println(stat);
    }

    //------------------------------------修改---------------------------
    /**
     * 修改节点数据 setData().forPath
     * @throws Exception
     */
    @Test
    public void  testSet() throws Exception {
        curatorFramework.setData().forPath("/app1","heihei".getBytes());
    }

    /**
     * 根据版本修改 setData().withVersion
     * @throws Exception
     */
    @Test
    public void  testSetForVersion() throws Exception {
        Stat stat=new Stat();
        byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/app1");
        int version=stat.getVersion();
        //解决并发的问题 保证其它客户端不要干扰当前客户端
        curatorFramework.setData().withVersion(version).forPath("/app1","heihei".getBytes());
    }

    //------------------------------------删除---------------------------

    /**
     * 删除单个节点
     * delete().forPath
     * @throws Exception
     */
    @Test
    public void  testDelete() throws Exception {
        curatorFramework.delete().forPath("/app1");
    }

    /**
     * 删除带有子节点的节点
     * delete().deletingChildrenIfNeeded().forPath
     * @throws Exception
     */
    @Test
    public void  testDelete2() throws Exception {
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/app4");
    }

    /**
     * 必须成功删除节点 防止网络抖动
     * delete().guaranteed().forPath
     * @throws Exception
     */
    @Test
    public void  testDelete3() throws Exception {
        curatorFramework.delete().guaranteed().forPath("/app2");
    }

    /**
     * 删除回调
     * delete().guaranteed().inBackground
     * @throws Exception
     */
    @Test
    public void  testDelete4() throws Exception {
        //删除回调
        curatorFramework.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("我被删除了client:"+client+"--"+event);
            }
        }).forPath("/app1");

    }
    @After
    public void testClose(){
        if(curatorFramework!=null){
            curatorFramework.close();
        }
    }



}

curator的watch的监控

具体的代码实现

package curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;

public class CuratorWatchTest {


    private CuratorFramework curatorFramework;
    /**
     * 建立连接
     */
    @Before
    public void  testConnect(){
        //第一种方式
        /**
         * String connectString,  连接字符串 zkServer的地址和端口
         * int sessionTimeoutMs, 会话超时时间  单位为毫秒 跟zkServer会话
         * int connectionTimeoutMs, 建立连接的超时时间
         * RetryPolicy retryPolicy 重试策略 客户端连接失败的策略
         *
         */
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
        curatorFramework = CuratorFrameworkFactory.newClient("127.0.0.1:2181,27.0.0.2:2181", 60 * 1000, 15 * 1000, retryPolicy);

        //开启客户端
        curatorFramework.start();

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181,27.0.0.2:2181").
                connectionTimeoutMs(15 * 1000).sessionTimeoutMs(60 * 1000).namespace("heihi")//默认是一个根节点
                .retryPolicy(retryPolicy).build();
        client.start();

    }

    /**
     * 监控单个节点变化
     */
    @Before
    public void  testNodeCache() throws Exception {
        //创建NodeCache
        /**
         * @param client curztor client
         * @param path the full path to the node to cache
         * @param dataIsCompressed if true, data in the path is compressed
         */
        NodeCache nodeCache=new  NodeCache(curatorFramework,"/app1");
        //注册监控
        /**
         * 增删改都会回调
         */
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("  nodeChanged");
                //获取修改节点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //开始监听
        /**
         * 如果设置为true 开启监听 加载缓冲数据
         */
        nodeCache.start(true);
    }


    /**
     * 监控单个节点子节点变化
     */
    @Before
    public void  testPathChildernCache() throws Exception {
        //创建PathChildrenCache
        /**
         * @param client           the client
         * @param path             path to watch
         * @param cacheData        if true, node contents are cached in addition to the stat
         * @param dataIsCompressed if true, data in the path is compressed
         * @param executorService  Closeable ExecutorService to use for the PathChildrenCache's background thread. This service should be single threaded, otherwise the cache may see inconsistent results.
         */
        PathChildrenCache pathChildrenCache=new  PathChildrenCache(curatorFramework,"/app2",true);
        //注册监控
        /**
         * 增删改都会回调
         */
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            /**
             * /**
             *          * A child was added to the path
             *          *
             *  CHILD_ADDED,
                    *
                    *         /**
             *          * A child's data was changed
             *          *
                    *CHILD_UPDATED,
                    *
                    *         /**
             *          * A child was removed from the path
             *          *
                    *CHILD_REMOVED,
             * @param client
             * @param event
             * @throws Exception
             */
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("children nodeChanged");
                //监听子节点数据变更才进行处理
                if(event.getType()== PathChildrenCacheEvent.Type.CHILD_UPDATED){
                    System.out.println(event.getData().getData());
                }

            }
        });
        //开始监听
        /**
         * 如果设置为true 开启监听 加载缓冲数据
         */
        pathChildrenCache.start();
    }


    /**
     * 监控某个节点和所有 子节点
     */
    @Before
    public void  testTreeCache() throws Exception {
        //创建TreeCache
        /**
         * @param client curztor client
         * @param path the full path to the node to cache
         * @param dataIsCompressed if true, data in the path is compressed
         */
        TreeCache treeCache=new  TreeCache(curatorFramework,"/app1");
        //注册监控
        /**
         * 增删改都会回调
         */
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("nodechange");
                System.out.println("client:"+client+"event"+event);
            }
        });
        //开始监听
        /**
         * 如果设置为true 开启监听 加载缓冲数据
         */
        treeCache.start();
    }

    @After
    public void testClose(){
        if(curatorFramework!=null){
            curatorFramework.close();
        }
    }

}

 zookeeper实现分布式锁

 

基于redis的分布式锁:https://blog.csdn.net/u011955252/article/details/114174257

zookeeper的分布式锁的原理

 

 

package curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Ticket12306 implements Runnable{


    private int tickets=10;

    private InterProcessMutex lock;

    public Ticket12306(){
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.
                newClient("127.0.0.1:2181,27.0.0.2:2181", 60 * 1000, 15 * 1000, retryPolicy);
        //开启客户端
        curatorFramework.start();
        lock=new InterProcessMutex(curatorFramework,"/lock");
    }

    @Override
    public void run() {
        while (true){
            //加锁
            try{
                lock.acquire();
                if(tickets>0){
                    System.out.println(Thread.currentThread().getName()+" 买票了"+tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }
}
package curator;

public class LockTest {

    public static void main(String[] args) {
        Ticket12306 ticket12306=new Ticket12306();

        new Thread(ticket12306,"肥猪").start();

        new Thread(ticket12306,"携程").start();

    }
}

zookeeper  集群搭建

 

 

以上是关于zookeeper学习的主要内容,如果未能解决你的问题,请参考以下文章

在ansible模板中使用动态组名称

java SpringRetry学习的代码片段

python 机器学习有用的代码片段

zookeeper学习笔记

学习笔记:python3,代码片段(2017)

学习 PyQt5。在我的代码片段中找不到错误 [关闭]