Java代码操作zookeeper

Posted sun-flower1314

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java代码操作zookeeper相关的知识,希望对你有一定的参考价值。

以下为一个完整JAVA操作Zookeeper项目步骤:

 

1. 项目中pom.xml中添加需要的依赖jar包信息

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.9</version>
    </dependency>
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>
</dependencies>

 

2. 在resource下添加log4j.properties日志打印信息

log4j.rootLogger=DEBUG,myConsole
log4j.appender.myConsole=org.apache.log4j.ConsoleAppender
log4j.appender.myConsole.ImmediateFlush=true
log4j.appender.myConsole.Target=System.out
log4j.appender.myConsole.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsole.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

 

 3. 使用Java代码操作Zookeeper

  包括创建节点、设置节点值、获取节点值、判断节点是否存在

  创建节点时,存在四种模式:(即在createZKNode方法中)

    1. CreateMode.PERSISTENT :持久节点,一旦创建就保存到硬盘上面

    2. CreateMode.SEQUENTIAL : 顺序持久节点

    3. CreateMode.EPHEMERAL :临时节点,创建以后如果断开连接则该节点自动删除

    4. CreateMode.EPHEMERAL_SEQUENTIAL :顺序临时节点

  创建ZKOperaDemo.java类:

package com.hxc.zookeeperDemo;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs.Ids;

/**
 * 
 * @author sun_flower
 * 
 */
public class ZKOperaDemo 
    
    private static String connectString = "192.168.202.132:2181";
    private static int sessionTimeout = 50 * 1000;
    /**
     * 连接Zookeeper服务器
     * @return
     * @throws IOException
     */
    public ZooKeeper connectionZooKeeper() throws IOException 
        
        ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() 
            
            public void process(WatchedEvent event) 
                //可做其他操作(设置监听或观察者)
            
        );
        return zooKeeper;
    
    
    /**
     * 创建节点
     * 1. CreateMode.PERSISTENT :持久节点,一旦创建就保存到硬盘上面
     2.  CreateMode.SEQUENTIAL : 顺序持久节点
     3.  CreateMode.EPHEMERAL :临时节点,创建以后如果断开连接则该节点自动删除
     4.  CreateMode.EPHEMERAL_SEQUENTIAL :顺序临时节点
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 要创建节点的路径
     * @param data 该节点上的数据
     * @return 返回创建的节点的路径
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String createZKNode(ZooKeeper zooKeeper, String path, String data) throws KeeperException, InterruptedException 
        byte[] bytesData = data.getBytes();
        //访问控制列表
        ArrayList<ACL> openAclUnsafe = Ids.OPEN_ACL_UNSAFE;
        //创建模式
        CreateMode mode = CreateMode.PERSISTENT;
        String result = zooKeeper.create(path, bytesData, openAclUnsafe, mode);
        System.out.println("创建节点成功: " + result);
        return result;
    
    
    /**
     * 获取节点上的数据
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 节点路径
     * @return 返回节点上的数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String getZKNodeData(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException 
        byte[] data = zooKeeper.getData(path, false, new Stat());
//        System.out.println("该节点" + path + "上的数据伟: " + new String(data));
        return new String(data);
    
    
    /**
     * 设置节点上的数据
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 节点路径
     * @param data
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat setZKNodeData(ZooKeeper zooKeeper, String path, String data) throws KeeperException, InterruptedException 
        return zooKeeper.setData(path, data.getBytes(), -1);
    
    
    /**
     * 判断节点是否存在
     * @param zooKeeper
     * @param path 节点路径
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat isExitZKPath(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException 
        Stat stat = zooKeeper.exists(path, false);
        return stat;
    

 

4. 测试代码TestZK.java

  1)测试连接是否成功:

  //1.测试连接是否成功
    @Test
    public void testConnection() throws IOException, InterruptedException 
        ZKOperaDemo zkOperaDemo = new ZKOperaDemo();
        ZooKeeper zooKeeper = zkOperaDemo.connectionZooKeeper();
        System.out.println("====================");
        System.out.println(zooKeeper);
        System.out.println("====================");
        Thread.sleep(Long.MAX_VALUE);
    

 运行之后控制台打印的信息如下,最后两条可以看到一直在打印 ping ZooKeeper服务的信息(Got ping response for sessionid: 0x16c9968e7d0000e after 2ms  

[INFO ] 2019-08-21 09:42:56,068(0) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT  
[INFO ] 2019-08-21 09:42:56,079(11) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:host.name=LAPTOP-L6EGT293  
[INFO ] 2019-08-21 09:42:56,079(11) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.version=1.8.0_60  
[INFO ] 2019-08-21 09:42:56,080(12) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.vendor=Oracle Corporation  
[INFO ] 2019-08-21 09:42:56,080(12) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.home=D:\\ProgramFiles\\JRE1.8  
[INFO ] 2019-08-21 09:42:56,081(13) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.class.path=D:\\eclipse_mySpaces\\zookeeperDemo\\target\\test-classes;D:\\eclipse_mySpaces\\zookeeperDemo\\target\\classes;D:\\ProgramFiles\\eclipse\\plugins\\org.junit_4.12.0.v201504281640\\junit.jar;D:\\ProgramFiles\\eclipse\\plugins\\org.hamcrest.core_1.3.0.v20180420-1519.jar;C:\\Users\\sun_flower\\.m2\\repository\\org\\apache\\zookeeper\\zookeeper\\3.4.9\\zookeeper-3.4.9.jar;C:\\Users\\sun_flower\\.m2\\repository\\org\\slf4j\\slf4j-api\\1.6.1\\slf4j-api-1.6.1.jar;C:\\Users\\sun_flower\\.m2\\repository\\org\\slf4j\\slf4j-log4j12\\1.6.1\\slf4j-log4j12-1.6.1.jar;C:\\Users\\sun_flower\\.m2\\repository\\log4j\\log4j\\1.2.16\\log4j-1.2.16.jar;C:\\Users\\sun_flower\\.m2\\repository\\jline\\jline\\0.9.94\\jline-0.9.94.jar;C:\\Users\\sun_flower\\.m2\\repository\\io\\netty\\netty\\3.10.5.Final\\netty-3.10.5.Final.jar;C:\\Users\\sun_flower\\.m2\\repository\\com\\101tec\\zkclient\\0.10\\zkclient-0.10.jar;D:\\ProgramFiles\\eclipse\\configuration\\org.eclipse.osgi\\407\\0\\.cp;D:\\ProgramFiles\\eclipse\\configuration\\org.eclipse.osgi\\406\\0\\.cp  
[INFO ] 2019-08-21 09:42:56,082(14) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.library.path=D:\\ProgramFiles\\JRE1.8\\bin;C:\\WINDOWS\\Sun\\Java\\bin;C:\\WINDOWS\\system32;C:\\WINDOWS;C:\\ProgramData\\Oracle\\Java\\javapath;C:\\Windows\\system32;C:\\Windows;C:\\Windows\\System32\\Wbem;C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\;C:\\Windows\\System32\\OpenSSH\\;C:\\Program Files (x86)\\NVIDIA Corporation\\PhysX\\Common;C:\\Program Files\\NVIDIA Corporation\\NVIDIA NvDLISR;D:\\ProgramFiles\\JDK1.8\\bin;D:\\ProgramFiles\\JDK1.8\\jre\\bin;C:\\WINDOWS\\system32;C:\\WINDOWS;C:\\WINDOWS\\System32\\Wbem;C:\\WINDOWS\\System32\\WindowsPowerShell\\v1.0\\;C:\\WINDOWS\\System32\\OpenSSH\\;D:\\ProgramFiles\\apache-maven-3.3.9\\bin;D:\\ProgramFiles\\mysql5.0\\bin;C:\\Users\\sun_flower\\AppData\\Local\\Microsoft\\WindowsApps;;D:\\ProgramFiles\\IntelliJIDEA2018.3.5\\bin;;.  
[INFO ] 2019-08-21 09:42:56,082(14) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.io.tmpdir=C:\\Users\\SUN_FL~1\\AppData\\Local\\Temp\\  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.compiler=<NA>  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.name=Windows 10  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.arch=amd64  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.version=10.0  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.name=sun_flower  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.home=C:\\Users\\sun_flower  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.dir=D:\\eclipse_mySpaces\\zookeeperDemo  
[INFO ] 2019-08-21 09:42:56,087(19) --> [main] org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:438): Initiating client connection, connectString=192.168.202.132:2181 sessionTimeout=50000 watcher=com.hxc.zookeeperDemo.ZKOperaDemo$1@6073f712  
[DEBUG] 2019-08-21 09:42:56,094(26) --> [main] org.apache.zookeeper.ClientCnxn.<clinit>(ClientCnxn.java:117): zookeeper.disableAutoWatchReset is false  
[INFO ] 2019-08-21 09:42:56,296(228) --> [main] org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:438): Initiating client connection, connectString=192.168.202.132:2181 sessionTimeout=50000 watcher=com.hxc.zookeeperDemo.ZKOperaDemo$1@6ea6d14e  
====================
State:CONNECTING sessionid:0x0 local:null remoteserver:null lastZxid:0 xid:1 sent:0 recv:0 queuedpkts:0 pendingresp:0 queuedevents:0
====================
[INFO ] 2019-08-21 09:42:56,317(249) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:1032): Opening socket connection to server 192.168.202.132/192.168.202.132:2181. Will not attempt to authenticate using SASL (unknown error)  
[INFO ] 2019-08-21 09:42:56,317(249) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:1032): Opening socket connection to server 192.168.202.132/192.168.202.132:2181. Will not attempt to authenticate using SASL (unknown error)  
[INFO ] 2019-08-21 09:42:56,325(257) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:876): Socket connection established to 192.168.202.132/192.168.202.132:2181, initiating session  
[DEBUG] 2019-08-21 09:42:56,327(259) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:949): Session establishment request sent on 192.168.202.132/192.168.202.132:2181  
[INFO ] 2019-08-21 09:42:56,333(265) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.onConnected(ClientCnxn.java:1299): Session establishment complete on server 192.168.202.132/192.168.202.132:2181, sessionid = 0x16c9968e7d0000d, negotiated timeout = 40000  
[INFO ] 2019-08-21 09:42:57,320(1252) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:876): Socket connection established to 192.168.202.132/192.168.202.132:2181, initiating session  
[DEBUG] 2019-08-21 09:42:57,321(1253) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:949): Session establishment request sent on 192.168.202.132/192.168.202.132:2181  
[INFO ] 2019-08-21 09:42:57,325(1257) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.onConnected(ClientCnxn.java:1299): Session establishment complete on server 192.168.202.132/192.168.202.132:2181, sessionid = 0x16c9968e7d0000e, negotiated timeout = 40000  
[DEBUG] 2019-08-21 09:43:09,671(13603) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16c9968e7d0000d after 6ms  
[DEBUG] 2019-08-21 09:43:10,656(14588) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16c9968e7d0000e after 2ms  

 如果控制台报如下错误(java.net.ConnectException: Connection refused: no further information),则可能是ZooKeeper服务没有开启或者防火墙没有关闭或防火墙没有开启2181端口

[WARN ] 2019-08-21 09:59:42,274(2080) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1162): Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect  
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
[DEBUG] 2019-08-21 09:59:42,276(2082) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxnSocketNIO.cleanup(ClientCnxnSocketNIO.java:203): Ignoring exception during shutdown input  

 

  2)测试连接ZooKeeper服务通后之后,就可以完整的测试其他功能了

  所有完整的测试代码:(测试创建节点、设置节点值、获取节点值、判断节点是否存在)

package com.hxc.zookeeperDemo;

import java.io.IOException;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

public class TestZK 
    
    //1.测试连接是否成功
    @Test
    public void testConnection() throws IOException, InterruptedException 
        ZKOperaDemo zkOperaDemo = new ZKOperaDemo();
        ZooKeeper zooKeeper = zkOperaDemo.connectionZooKeeper();
        System.out.println("====================");
        System.out.println(zooKeeper);
        System.out.println("====================");
        Thread.sleep(Long.MAX_VALUE);
    
    private ZKOperaDemo nodeOperation = new ZKOperaDemo();
    private ZooKeeper zooKeeper = null;
    
        try 
            zooKeeper = nodeOperation.connectionZooKeeper();
         catch (IOException e) 
            e.printStackTrace();
        
    
    //测试创建节点
    @Test
    public void testCreateZKNode() throws KeeperException, InterruptedException 
        String result = nodeOperation.createZKNode(zooKeeper, "/address", "ShenZhen");
        System.out.println(result);
    
    //测试获取节点数据
    @Test
    public void testGetZKNodeData() throws KeeperException, InterruptedException 
        String result = nodeOperation.getZKNodeData(zooKeeper, "/address");
        System.out.println(result);
    
    //测试设置节点数据
    @Test
    public void testSetZKNodeData() throws KeeperException, InterruptedException 
        Stat stat = nodeOperation.setZKNodeData(zooKeeper, "/address", "Shen Zhen update");
        System.out.println(stat);    //结果是二进制数据
        if(null != null)
        System.out.println(stat.getCversion());
    
    //测试节点是否存在
    @Test
    public void testIsExitZKPath() throws KeeperException, InterruptedException 
        Stat stat = nodeOperation.isExitZKPath(zooKeeper, "/addressaa");
        System.out.println(stat);    //结果是二进制数据  如果节点不存在,则返回null
        if(null != null)
            System.out.println(stat.getCversion());
    
    
    

  3)在Zookeeper服务上就能看到测试后的信息了:

            技术图片

 5. zookeeper的通知机制

  在初次建立连接和设置节点时均可设置观察者(监听),每一次的观察只使用一次,每次使用完观察若还想监听下次的操作,需要重新设置观察者。即Watcher设置到节点上之后是一次性的,通知一次之后就会失效。所以我们在通知的回调方法中接收执行通知操作后需要再继续设置一个Watcher。

  实现持续的观察代码部分:(递归回调)

  //通知机制
    /**
     *   监听节点 获取节点上的数据  
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 节点路径
     * @return 返回节点上的数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void getZKNodeData2(final ZooKeeper zooKeeper, final String path) throws KeeperException, InterruptedException 
        byte[] data = zooKeeper.getData(path, new Watcher() 
            
            public void process(WatchedEvent event) 
                try 
                    String data2 = ZKOperaDemo.process(zooKeeper, path);
                    System.out.println("第一次调用============= " + data2 + " =================");
                 catch (KeeperException e) 
                    e.printStackTrace();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        , new Stat());
        System.out.println("该节点" + path + "上的数据为: " + new String(data));
        Thread.sleep(Long.MAX_VALUE);
    
    
    public static String process(final ZooKeeper zooKeeper, final String path) throws KeeperException, InterruptedException 
        byte[] data = zooKeeper.getData(path, new Watcher() 
            public void process(WatchedEvent event) 
                try 
                    String data = ZKOperaDemo.process(zooKeeper, path);
                    System.out.println("============= " + data + " =================");
                 catch (KeeperException e) 
                    e.printStackTrace();
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        , new Stat());
        return new String(data);
    
    

 测试代码:

   @Test
    public void testGetZKNodeData2() throws KeeperException, InterruptedException 
        nodeOperation.getZKNodeData2(zooKeeper, "/address");
    

启动了这个服务后,然后向节点设置值之后,就可以在控制台打印相应的监听信息:

  1)初次启动时,控制台打印的部分主要信息:

[DEBUG] 2019-08-21 16:08:16,230(95) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e, packet:: clientPath:null serverPath:null finished:false header:: 1,4  replyHeader:: 1,512,0  request:: ‘/address,T  response:: #5368656e205a68656e20757064617465,s448,511,1566270677948,1566374882666,19,0,0,0,16,0,448   
该节点/address上的数据为: Shen Zhen update
[DEBUG] 2019-08-21 16:08:29,564(13429) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 2ms  

  2)在服务端或者在测试获取节点值的部分重新设置:(为了更直观查看,在服务端设置值)

            技术图片

  然后控制台上马上打印对应的观察者信息:

[DEBUG] 2019-08-21 16:10:27,104(130969) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e, packet:: clientPath:null serverPath:null finished:false header:: 2,4  replyHeader:: 2,515,0  request:: ‘/address,T  response:: #6265696a696e67,s448,515,1566270677948,1566375027091,20,0,0,0,7,0,448   
第一次调用============= beijing =================
[DEBUG] 2019-08-21 16:10:40,437(144302) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 1ms  

 

   3)再次调用:

         技术图片

  控制台打印的信息:(会持续的监听)

[DEBUG] 2019-08-21 16:12:36,498(260363) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e, packet:: clientPath:null serverPath:null finished:false header:: 3,4  replyHeader:: 3,516,0  request:: ‘/address,T  response:: #7368616e67686169,s448,516,1566270677948,1566375156485,21,0,0,0,8,0,448   
============= shanghai =================
[DEBUG] 2019-08-21 16:12:49,832(273697) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 1ms  

  

以上就是简单的java操作zookeeper的方式

 

以上是关于Java代码操作zookeeper的主要内容,如果未能解决你的问题,请参考以下文章

找不到自动清除的 Zookeeper java 类

Apache Curator操作zookeeper的API使用

Zookeeper客户端java代码操作

我的 Java 代码如何读取操作系统环境变量? [复制]

通过java代码实现调用excel当中的宏的操作。

java 操作系统和架构检测i java代码。