javaAPI操作-Zookeeper

Posted 小乞丐程序员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javaAPI操作-Zookeeper相关的知识,希望对你有一定的参考价值。

· ## 4)ZooKeeper JavaAPI 操作

4.1)Curator介绍

•Curator 是 Apache ZooKeeper 的Java客户端库。

•常见的ZooKeeper Java API :

•原生Java API

•ZkClient

•Curator

•Curator 项目的目标是简化 ZooKeeper 客户端的使用。

•Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。

•官网:http://curator.apache.org/

4.2)JavaAPI操作建立连接

1,搭建项目

创建项目curator-zk

引入pom和日志文件

资料文件夹下pom.xml和log4j.properties

 /**
     * 建立连接
     */
    @Test
    public void testConnect() 

        /**
         * @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
         * @param sessionTimeoutMs 会话超时时间 单位ms
         * @param connectionTimeoutMs 链接超时时间 单位ms
         * @param retryPolicy 重试策略
         */

        /// 创建重试策略 休眠时间, 重试次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        /// 1 第一种方式
//        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        /// 开启链接
//        curatorFramework.start();
        /// 2 第二种方式
        CuratorFramework build = CuratorFrameworkFactory.builder().connectString("192.168.0.102 :2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();
        build.start();

    

4.3)Zookeeper JavaAPI操作-创建节点

/**

  • 创建节点:create 持久 临时 顺序 数据
    1. 基本创建 :create().forPath(“”)
    1. 创建节点 带有数据:create().forPath(“”,data)
    1. 设置节点的类型:create().withMode().forPath(“”,data)
    1. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath(“”,data)
      */
      节点可以分为四大类:

PERSISTENT 持久化节点
EPHEMERAL 临时节点 :-e
PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

package com.itheima.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorTest 

    private  CuratorFramework build;

    /**
     * 建立连接
     */
    @Before
    public void testConnect() 

        /**
         * @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
         * @param sessionTimeoutMs 会话超时时间 单位ms
         * @param connectionTimeoutMs 链接超时时间 单位ms
         * @param retryPolicy 重试策略
         */

        /// 创建重试策略 休眠时间, 重试次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        /// 1 第一种方式
//        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        /// 开启链接
//        curatorFramework.start();
        /// 2 第二种方式
        build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();
        build.start();

    

    /**
     * 创建节点 create 持久 临时 顺序 数据
     * 1 基本创建 
     * 2 创建节点, 带有数据
     * 3 设置节点的类型
     * 4 创建多级节点
     */

    @Test
    public void testCreate() throws Exception 
        /// 1 基本创建
        //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
        String path = build.create().forPath("/app1");
        System.out.println(path);
    

    @Test
    public void testCreate2() throws Exception 
        /// 2 创建节点, 带有数据
        //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
        /// 第二个参数是getBytes数组
        String path = build.create().forPath("/app2","hehe".getBytes());
        System.out.println(path);
    

    @Test
    public void testCreate3() throws Exception 
        /// 3 设置节点的类型
       /// 默认类型:持久化 临时的
        String path = build.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
        System.out.println(path);
    

    @Test
    public void testCreate4() throws Exception 
        /// 4 创建多级节点
        /// 默认类型:持久化 临时的
        /// creatingParentContainersIfNeeded():如果父节点不存在,则创建父节点
        String path = build.create().creatingParentContainersIfNeeded().forPath("/app4/p1");
        System.out.println(path);
    
    @After
    public void close() 
        if(build != null) 
            build.close();
        
    


4.4)ZookeeperJavaAPI操作-查询节点

/**

  • 查询节点:
    1. 查询数据:get: getData().forPath()
    1. 查询子节点: ls: getChildren().forPath()
    1. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
      */
package com.itheima.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorTest 

    private  CuratorFramework build;

    /**
     * 建立连接
     */
    @Before
    public void testConnect() 

        /**
         * @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
         * @param sessionTimeoutMs 会话超时时间 单位ms
         * @param connectionTimeoutMs 链接超时时间 单位ms
         * @param retryPolicy 重试策略
         */

        /// 创建重试策略 休眠时间, 重试次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        /// 1 第一种方式
//        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        /// 开启链接
//        curatorFramework.start();
        /// 2 第二种方式
        build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();
        build.start();

    
    /**
     * 查询节点:
     * 1 查询数据 get
     * 2 查询子节点 ls
     * 3 查询节点状态信息 ls -s
     * @throws Exception
     */
    @Test
    public void testGet1() throws Exception 
        ///1 查询数据 get
        byte[] bytes = build.getData().forPath("/app1");
        System.out.println(new String(bytes)); ///192.168.198.1
    

    @Test
    public void testGet2() throws Exception 
        ///2 查询子节点 ls
       //List<String> strings = build.getChildren().forPath("/app4");///[p1]
        List<String> strings = build.getChildren().forPath("/"); ///[app2, app1, app4]
        System.out.println(strings);
    

    @Test
    public void testGet3() throws Exception 
        ///3 查询节点状态信息 ls -s
        Stat status = new Stat();
        System.out.println(status); ///0,0,0,0,0,0,0,0,0,0,0
        byte[] bytes = build.getData().storingStatIn(status).forPath("/app1"); //3,3,1673978538957,1673978538957,0,0,0,0,13,0,3
        System.out.println(status);
    

    @After
    public void close() 
        if(build != null) 
            build.close();
        
    


4.5)Zookeeper JavaAPI操作-修改节点

/**

  • 修改数据
    1. 基本修改数据:setData().forPath()
    1. 根据版本修改: setData().withVersion().forPath()
    • version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
      */
package com.itheima.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorTest 

    private  CuratorFramework build;

    /**
     * 建立连接
     */
    @Before
    public void testConnect() 

        /**
         * @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
         * @param sessionTimeoutMs 会话超时时间 单位ms
         * @param connectionTimeoutMs 链接超时时间 单位ms
         * @param retryPolicy 重试策略
         */

        /// 创建重试策略 休眠时间, 重试次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        /// 1 第一种方式
//        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        /// 开启链接
//        curatorFramework.start();
        /// 2 第二种方式
        build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();
        build.start();

    
///============================SET==============================================
    /**
     * 修改数据
     * 1. 基本修改数据:setData().forPath()
     * 2. 根据版本修改: setData().withVersion().forPath()
     * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
     *
     * @throws Exception
     */
    @Test
    public void testSet() throws Exception 
        //1. 基本修改数据:setData().forPath()
       build.setData().forPath("/app1", "itcast".getBytes());
    

    @Test
    public void testSetForVersion() throws Exception 
        //     * 2. 根据版本修改: setData().withVersion().forPath()
        //     * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
        Stat status = new Stat();
        build.getData().storingStatIn(status).forPath("/app1");
        int version = status.getVersion(); /// 查询出来的 1
        System.out.println(version);
        build.setData().withVersion(version).forPath("/app1", "haha".getBytes());
    

    @After
    public void close() 
        if(build != null) 
            build.close();
        
    


4.6)Zookeeper JavaAPI操作-删除节点

/**

  • 删除节点: delete deleteall
    1. 删除单个节点:delete().forPath(“/app1”);
    1. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath(“/app1”);
    1. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath(“/app2”);
    1. 回调:inBackground
  • @throws Exception
    */
package com.itheima.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorTest 

    private  CuratorFramework build;

    /**
     * 建立连接
     */
    @Before
    public void testConnect() 

        /**
         * @param connectSting 链接字符串 zk server 地址和端口 192.168.0.102 :2181,192.168.0.103:2181
         * @param sessionTimeoutMs 会话超时时间 单位ms
         * @param connectionTimeoutMs 链接超时时间 单位ms
         * @param retryPolicy 重试策略
         */

        /// 创建重试策略 休眠时间, 重试次数
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        /// 1 第一种方式
//        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.0.102 :2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        /// 开启链接
//        curatorFramework.start();
        /// 2 第二种方式
        build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();
        build.start();

    
    ///============================delete==============================================

    /**
     * 删除节点: delete deleteall
     * 1. 删除单个节点:delete().forPath("/app1");
     * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
     * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");
     * 4. 回调:inBackground
     * @throws Exception
     */
    @Test
    public void testDelete() throws Exception 
        // 1. 删除单个节点:delete().forPath("/app1");
        build.delete().forPath("/app1");
    

    @Test
    public void testDelete1() throws Exception 
        // 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
        build.delete().deletingChildrenIfNeeded().forPath("/app4");
    

    @Test
    public void testDelete2() throws Exception 
        // 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");
        build.delete().guaranteed().forPath("/app2");
    

    @Test
    public void testDelete3() throws Exception 
        //4. 回调:inBackground
        build.delete().guaranteed().inBackground(new BackgroundCallback() 
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception 以上是关于javaAPI操作-Zookeeper的主要内容,如果未能解决你的问题,请参考以下文章

JavaApi操作ElasticSearch(强烈推荐)

javaAPI操作-Zookeeper

hdfs-javaAPI操作

JavaAPI操作hbase

hdfs深入:10hdfs的javaAPI操作

HBase的JavaAPI操作