三天学会ZooKeeper第三天(全网最细)

Posted 活跃的咸鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了三天学会ZooKeeper第三天(全网最细)相关的知识,希望对你有一定的参考价值。

本篇文章来自黑马程序员ZooKeeper教程

三天学会ZooKeeper第一天(全网最细)
三天学会ZooKeeper第二天(全网最细)

zookeeper 开源客户端curator介绍

curator简介

curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache, curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。

原生zookeeperAPI的不足:

  1. 连接对象异步创建,需要开发人员自行编码等待
  2. 连接没有自动重连超时机制
  3. watcher一次注册生效一次
  4. 不支持递归创建树形节点

curator特点:

  1. 解决session会话超时重连
  2. watcher反复注册
  3. 简化开发api
  4. 遵循Fluent风格的API
  5. 提供了分布式锁服务、共享计数器、缓存机制等机制

Curator的组件
在这里插入图片描述
Maven依赖
在这里插入图片描述
我们在实际的应用时,最常用的是curator-recipes。

连接到ZooKeeper

依赖

       <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>
public class testConnection {
    public static void main(String[] args) {
        // session重连策略
        /*
        3秒后重连一次,只重连1次
        RetryPolicy retryPolicy = new RetryOneTime(3000);
        */
        /*
        每3秒重连一次,重连3次
        RetryPolicy retryPolicy = new RetryNTimes(3,3000);
        */
        /*
        每3秒重连一次,总等待时间超过10秒后停止重连
        RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000);
        */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
       CuratorFramework client = CuratorFrameworkFactory.builder()
               // IP地址端口号
               .connectString("192.168.1.108")
                                // 会话超时时间
                               .sessionTimeoutMs(5000)
                                // 重连机制
                               .retryPolicy(retryPolicy)
                                // 命名空间
                               .namespace("create")
                                // 构建连接对象
                               .build();
       client.start();
        System.out.println(client.isStarted());
        client.close();
    }
}

官网的打开连接方式

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

新增节点

public class testCreate {
    private String ip="192.168.1.108:2181";
    CuratorFramework client;
    RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
    @Before
    public void conn(){
        client= CuratorFrameworkFactory.builder().connectString(ip).
                sessionTimeoutMs(5000).
                retryPolicy(retryPolicy)
                .namespace("create").build();
        client.start();
    }
    @Test
    public void create()throws Exception{
        client.create()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.READ_ACL_UNSAFE)
                .forPath("/node","node1".getBytes());
        System.out.println("end!!!");
    }
    @Test
    public void create2() throws Exception{
        // 自定义权限列表
        // 权限列表
        List<ACL> list = new ArrayList<ACL>();
       // 授权模式和授权对象
        Id id = new Id("ip", "192.168.1.102");
        list.add(new ACL(ZooDefs.Perms.ALL, id));
        client.create().withMode(CreateMode.PERSISTENT).withACL(list).
                forPath("/node2", "node2".getBytes());
                System.out.println("结束");

    }
    @Test
    public void create3()throws Exception{
        // 递归创建节点树
        client.create()
                // 递归节点的创建
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/node3/node31", "node31".getBytes());
        System.out.println("结束");
    }
    @Test
    public void create4() throws Exception {
       // 异步方式创建节点
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                // 异步回调接口
                .inBackground(new BackgroundCallback() {
                    public void processResult(CuratorFramework
                                                      curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        // 节点的路径
                        System.out.println(curatorEvent.getPath());
                        // 时间类型
                        System.out.println(curatorEvent.getType());
                    }
                })
                .forPath("/node4","node4".getBytes());
        Thread.sleep(5000);
        System.out.println("结束");
    }

    @After
    public void close(){
        client.close();
    }
}

更新节点

public class TestSet {
    private String ip="192.168.1.102:2181";
    CuratorFramework client;
    RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
    @Before
    public void conn(){
        client= CuratorFrameworkFactory.builder().connectString(ip).
                sessionTimeoutMs(5000).
                retryPolicy(retryPolicy)
                .namespace("set").build();
        client.start();
    }
    @Test
    public void set1() throws Exception {
        // 更新节点
        client.setData()
              // arg1:节点的路径
              // arg2:节点的数据
                .forPath("/node1", "node11".getBytes());
        System.out.println("结束");

    }
    @Test
    public void set2() throws Exception {
        client.setData()
               // 指定版本号
                .withVersion(2)
                .forPath("/node1", "node1111".getBytes());
        System.out.println("结束");
    }
    @Test
    public void set3() throws Exception {
        // 异步方式修改节点数据
        client.setData()
                .withVersion(-1).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework curatorFramework,
                                      CuratorEvent curatorEvent) throws Exception {
                 // 节点的路径
                System.out.println(curatorEvent.getPath());
               // 事件的类型
                System.out.println(curatorEvent.getType());
            }
        }).forPath("/node1", "node1".getBytes());
        Thread.sleep(5000);
        System.out.println("结束");
    }



    @After
    public void close(){
        client.close();
    }
}

删除节点

public class testDelete {
    private String ip="192.168.1.102:2181";
    CuratorFramework client;
    RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
    @Before
    public void conn(){
        client= CuratorFrameworkFactory.builder().connectString(ip).
                sessionTimeoutMs(5000).
                retryPolicy(retryPolicy)
                .namespace("create").build();
        client.start();
    }

    @Test
    public void delete1() throws Exception {
      // 删除节点
        client.delete()
     // 节点的路径
                .forPath("/node1");
        System.out.println("结束");
    }
    @Test
    public void delete2() throws Exception {
        client.delete()
                // 版本号
                .withVersion(0)
                .forPath("/node1");
        System.out.println("结束");
    }
    @Test
    public void delete3() throws Exception {
         //删除包含字节点的节点
        client.delete()
                .deletingChildrenIfNeeded()
                .withVersion(-1)
                .forPath("/node1");
        System.out.println("结束");
    }
    @Test
    public void delete4() throws Exception {
        // 异步方式删除节点
        client.delete()
                .deletingChildrenIfNeeded()
                .withVersion(-1)
                .inBackground(new BackgroundCallback() {
                    public void processResult(CuratorFramework
                                                      curatorFramework, CuratorEvent curatorEvent) throws Exception {
                       // 节点路径
                        System.out.println(curatorEvent.getPath());
                       // 事件类型
                        System.out.println(curatorEvent.getType());
                    }
                })
                .forPath("/node1");
        Thread.sleep(5000);
        System.out.println("结束");
    }
    @After
    public void close(){
        client.close();
    }
}

查看节点

public class TestGet {
    private String ip="192.168.1.102:2181";
    CuratorFramework client;
    RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
    @Before
    public void conn(){
        client= CuratorFrameworkFactory.builder().connectString(ip).
                sessionTimeoutMs(5000).
                retryPolicy(retryPolicy)
                .namespace("create").build();
        client.start();
    }
    @Test
    public void get1() throws Exception {
          // 读取节点数据
        byte [] bys=client.getData()
           // 节点的路径
                .forPath("/node1");
        System.out.println(new String(bys));
    }
    @Test
    public void get2() throws Exception {
     // 读取数据时读取节点的属性
        Stat stat=new Stat();
        byte [] bys=client.getData()
            // 读取属性
                .storingStatIn(stat)
                .forPath("/node1");
        System.out.println(new String(bys));
        System.out.println(stat.getVersion());
    }
    @Test
    public void get3() throws Exception {
           // 异步方式读取节点的数据
        client.getData()
                .inBackground(new BackgroundCallback() {
                    public void processResult(CuratorFramework
                                                      curatorFramework, CuratorEvent curatorEvent) throws Exception {
                       // 节点的路径
                        System.out.println(curatorEvent.getPath());
                        // 事件类型
                        System.out.println(curatorEvent.getType());
                        // 数据
                        System.out.println(new
                                String(curatorEvent.getData()));
                    }
                })
                .forPath("/node1");
        Thread.sleep(5000);
        System.out.println("结束");
    }
    @After
    public void close(){
        client.close();
    }
}

查看子节点

public class TestGetchlden {

    private String ip="192.168.1.102:2181";
    CuratorFramework client;
    RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
    @Before
    public void conn(){
        client= CuratorFrameworkFactory.builder().connectString(ip).
                sessionTimeoutMs(5000).
                retryPolicy(retryPolicy)
                .namespace("get").build();
        client.start()三天学会ZooKeeper第二天(全网最细)

ZooKeeper面试题总结

10天学会perl第三天——条件与循环

10天学会phpWeChat——第三天:从数据库读取数据到视图

程序老鸟C#学习:3天学会全部基础--第三天

5天学会jaxws-webservice编程第三天