Zookeeper2.基于zk的开发入门
Posted 纵横千里,捭阖四方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper2.基于zk的开发入门相关的知识,希望对你有一定的参考价值。
目录
只通过命令玩zk是不够的,我们需要将其集成到业务代码里,很多公司会将zk的相关操作封装到一个公共的包中,不必每个服务都自己写一套。现在我们看一下如何基于zk进行二次开发。
2.1 搭建开发环境
首先建立一个pom工程,然后在pom.xml中添加如下配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>note</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<artifactId>zookeeper</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.14</version>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<systemPropertyVariables>
<buildTestDir>$java.io.tmpdir</buildTestDir>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.1.0-incubating</version>
</dependency>
</dependencies>
</project>
之后我们写一个简单的类:HelloWorld.java
package com.lqc.zk.basic;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class HelloWorld implements Watcher
ZooKeeper zk;
String hostPort;
HelloWorld(String hostPort)
this.hostPort = hostPort;
void startZK() throws IOException
zk = new ZooKeeper(hostPort, 15000, this);
public void process(WatchedEvent e)
System.out.println(e);
public static void main(String args[])
throws Exception
HelloWorld m = new HelloWorld("127.0.0.1:2181");
m.startZK();
System.out.println("start success");
我们开启本地的zk服务之后,执行上面的代码就可看到连接zk服务器成功。上面代码中,为了从ZooKeeper接收通知,我们需要实现监视点,所以必须实现zk的Watcher接口。
为了看到zk的详细日志信息,可以在resource下增加一个日志的配置文件: log4j.properties:
zkbook.root.logger=INFO,CONSOLE
zkbook.log.dir=.
zkbook.log.file=zkbook-example.log
log4j.rootLogger=$zkbook.root.logger
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%dISO8601 - %-5p - [%t:%C1@%L] - %m%n
log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=INFO
log4j.appender.ROLLINGFILE.File=$zkbook.log.dir/$zkbook.log.file
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%dISO8601 - %-5p - [%t:%C1@%L] - %m%n
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%dISO8601 - %-5p [%t:%C1@%L] - %m%n
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=zkbook-trace.log
log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.TRACEFILE.layout.ConversionPattern=%dISO8601 - %-5p [%t:%C1@%L][%x] - %m%n
这样再执行就可以看到zk启动时的关键信息。
2.2 获取管理权
如果建立会话成功了,我们接下来就尝试获得zk树的管理权,这样就可以进行增删改查等工作了。
我们首先看一下如何添加一个临时结点,并读到该结点的基本信息。我们使用ZooKeeper来实现简单的群首选举算法。也就是大家都来创建一个公共的结点。这个算法中,所有潜在的主节点进程尝试创建/master节点,但只有一个成 功,这个成功的进程成为主点。 常量ZooDefs.Ids.OPEN_ACL_UNSAFE为所有人提供了所有权限 (正如其名所显示的,这个ACL策略在不可信的环境下使用是非常不安 全的)。 ZooKeeper通过插件式的认证方法提供了每个节点的ACL策略功能,如果我们需要,就可以限制某个用户对某个znode节点的哪些权限,但对于这个简单的例子,我们继续使用OPEN_ACL_UNSAFE 策略。当然,我们希望在主节点死掉后/master节点会消失,从而可以重新选择主结点等,为此,可以使用ZooKeeper 的临时性znode节点来达到我们的目的。我们将定义一个EPHEMERAL 的znode节点,当创建它的会话关闭或无效时,ZooKeeper会自动检测到,并删除这个节点。这个对应的代码就是:
zk.create("/master",
serverId.getBytes(),
OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
上面的代码我们解析一下:
1.在上面代码,我们试着创建znode节点/master。如果这个znode节点存在,create 就会失败。同时我们想在/master节点的数据字段保存对应这个服务器的唯一ID。
2.数据字段只能存储字节数组类型的数据,所以我们将int型转换为一个字节数组。
3.如之前所提到的,我们使用开放的ACL策略。
4.我们创建的节点类型为EPHEMERAL,这种类型的结点在连接断开后就消失了,因此方便我们多次测试。
然而,这样做还不够,create方法会抛出两种异常:KeeperException和InterruptedException。特别是ConnectionLossException(KeeperException异常的子类)和InterruptedException。其他异常可以,但这两种异常,create方法可能已经成功了,所以如果我们作为主节点就需要捕获并处理它们。
ConnectionLossException异常发生于客户端与ZooKeeper服务端失去连接时。InterruptedException异常源于客户端线程调用了Thread.interrupt,通常这是因为应用程序部分关闭,但还在被其他相关应用的方法使用。处理ConnectionLossException异常时,我们需要找出哪个进程创建的/master节点,如果进程是自己,就开始成为群首角色。
我们通过 getData方法来处理:
byte[] getData(
String path,
bool watch,
Stat stat)
其中:
path类似其他ZooKeeper方法一样,第一个参数为我们想要获取数据的 znode节点路径。
watch表示我们是否想要监听后续的数据变更。如果设置为true,我们就可以通过我们创建ZooKeeper句柄时所设置的Watcher对象得到事件,同时另一个版本的方法提供了以Watcher对象为入参,通过这个传入的对象来接收变更的事件。我们在后续章节再讨论如何监视变更情况,现在我们设置这个参数为false,因为我们现在我们只想知道当前的数据是什么。
stat最后一个参数类型Stat结构,getData方法会填充znode节点的元数据信息。
返回值方法返回成功(没有抛出异常),就会得到znode节点数据的字节数组。
我们在runForMaster()方法中完成上述相关操作:
void runForMaster() throws KeeperException, InterruptedException
while (true)
try
zk.create("/master",
serverId.getBytes(),
OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
isLeader = true;
break;
catch (Exception ex)
isLeader = false;
System.out.println(ex);
if (checkMaster())
break;
上面的checkMaster()方法用来检查是否能够获取到创建的结点,并且检测自己是否为主结点 。代码如下:
boolean checkMaster()
while (true)
try
Stat stat = new Stat();
byte data[] = zk.getData("/master", false, stat);
isLeader = new String(data).equals(serverId);
System.out.println("data: " + Arrays.toString(data));
return true;
catch (InterruptedException e)
e.printStackTrace();
return false;
catch (KeeperException e)
e.printStackTrace();
return false;
本演示的完整代码:
public class ACLMaster implements Watcher
ZooKeeper zk;
String hostPort;
String serverId = "";
boolean isLeader = false;
ACLMaster(String hostPort)
this.hostPort = hostPort;
Random random = new Random();
serverId = Integer.toHexString(random.nextInt());
boolean checkMaster()
while (true)
try
Stat stat = new Stat();
byte data[] = zk.getData("/master", false, stat);
isLeader = new String(data).equals(serverId);
System.out.println("data: " + Arrays.toString(data));
return true;
catch (InterruptedException e)
e.printStackTrace();
return false;
catch (KeeperException e)
e.printStackTrace();
return false;
void runForMaster() throws KeeperException, InterruptedException
while (true)
try
zk.create("/master",
serverId.getBytes(),
OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
isLeader = true;
break;
catch (Exception ex)
isLeader = false;
System.out.println(ex);
if (checkMaster())
break;
void startZK() throws IOException
zk = new ZooKeeper(hostPort, 15000, this);
public void process(WatchedEvent e)
System.out.println(e);
void stopZK() throws Exception
zk.close();
public static void main(String args[])
throws Exception
ACLMaster m = new ACLMaster("127.0.0.1:2181");
m.startZK();
m.runForMaster();
if (m.isLeader)
System.out.println(" I'm the leader");
Thread.sleep(60000);
else
System.out.println(" I will be the leader");
System.out.println("start success");
m.stopZK();
2.3 异步获取管理权
ZooKeeper中,所有同步调用方法都有对应的异步调用方法。通过异步调用,我们可以在单线程中同时进行多个调用,也可以简化我们的实现方式。本小节我们就将上面的代码修改为异步调用的方式。
create方法的异步调用版本定义如下:
void create(String path,
byte[] data,
List<ACL> acl,
CreateMode createMode, AsyncCallback.StringCallback cb,
Object ctx)
create方法的异步方法与同步方法非常相似,仅仅多了两个参数: StringCallback类型的cb提供回调方法的对象。 2.ctx用户指定上下文信息(回调方法调用是传入的对象实例)。
该方法调用后通常在create请求发送到服务端之前就会立即返回。 回调对象通过传入的上下文参数来获取数据,当从服务器接收到create 请求的结果时,上下文参数就会通过回调对象提供给应用程序。
注意,该create方法不会抛出异常,因为调用返回前并不会等待create命令完成,所以我们无需关心InterruptedException异常;同时因请求的所有错误信息通过回调对象会第一个返回,所以我们也无需关心KeeperException异常。
回调对象实现只有一个方法的StringCallback接口:
void processResult(int rc, String path, Object ctx, String name)
异步方法调用会简单化队列对ZooKeeper服务器的请求,并在另一个线程中传输请求。当接收到响应信息,这些请求就会在一个专用回调线程中被处理。为了保持顺序,只会有一个单独的线程按照接收顺序处理响应包。
processResult各个参数的含义如下:
rc
返回调用的结构,返回OK或与KeeperException异常对应的编码 值。
path
我们传给create的path参数值。 ctx 我们传给create的上下文参数。
name
创建的znode节点名称。目前,调用成功后,path和name的值一样,但是,如果采用 CreateMode.SEQUENTIAL模式,这两个参数值就不会相等。
因为只有一个单独的线程处理所有回调调用,如果回调函数阻塞, 所有后续回调调用都会被阻塞,因此,一般不要在回调函数中集中操作或阻塞操作。
让我们继续完成我们的主节点的功能,我们创建了 masterCreateCallback对象,用于接收create命令的结果:
static AsyncCallback.StringCallback masterCreateCallback = new AsyncCallback.StringCallback()
@Override
public void processResult(int rc, String path, Object ctx, String name)
switch (KeeperException.Code.get(rc))
case CONNECTIONLOSS:
checkMaster();
return;
case OK:
isLeader = true;
break;
default:
isLeader = false;
System.out.println(" I'm leader: " + isLeader);
;
在上面的代码中,从rc参数中获得create请求的结果,并将其转换为Code枚举类 型。rc如果不为0,则对应KeeperException异常。如果因连接丢失导致create请求失败,会得到 CONNECTIONLOSS编码的结果,而不是ConnectionLossException异常。当连接丢失时,需要检查系统当前的状态,并判断需要如何恢复,我们将会在后面实现的checkMaster方法中进行处理。如果现在成为群首,应先简单地赋值isLeader为true。最后,在runForMaster方法中,将masterCreateCallback传给create方法,传入null作为上下文对象参数,因为在runForMaster方法中,我们现在不需要向masterCreateCallback.processResult方法传入任何信息。
我们现在需要实现checkMaster方法,这个方法与之前的同步情况不太一样,我们通过回调方法实现处理逻辑,因此在checkMaster函数中不会看到一系列的事件,而只有getData方法。getData调用完成后,后续处理将会在DataCallback对象中继续:
static AsyncCallback.DataCallback masterCheckCalllback = new AsyncCallback.DataCallback()
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
switch (KeeperException.Code.get(rc))
case CONNECTIONLOSS:
checkMaster();
return;
case NONODE:
runForMaster();
return;
;
同步方法和异步方法的处理逻辑是一样的,只是异步方法中,我们 没有使用while循环,而是通过异步操作在回调函数中进行错误处理。
此时,同步的版本看起来比异步版本实现起来更简单,但在应用程序常常由异步变化通知所驱动,因此最终以异步方式构建系统,反而使代码更简单。同时,异步调用不会阻塞应用程序, 这样其他事务可以继续进行,甚至是提交新的ZooKeeper操作。
本案例的完整代码:
public class AsynACLMaster implements Watcher
static ZooKeeper zk;
String hostPort;
static String serverId = "";
static boolean isLeader = false;
AsynACLMaster(String hostPort)
this.hostPort = hostPort;
Random random = new Random();
serverId = Integer.toHexString(random.nextInt());
static AsyncCallback.StringCallback masterCreateCallback = new AsyncCallback.StringCallback()
@Override
public void processResult(int rc, String path, Object ctx, String name)
switch (KeeperException.Code.get(rc))
case CONNECTIONLOSS:
checkMaster();
return;
case OK:
isLeader = true;
break;
default:
isLeader = false;
System.out.println(" I'm leader: " + isLeader);
;
static AsyncCallback.DataCallback masterCheckCalllback = new AsyncCallback.DataCallback()
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat)
switch (KeeperException.Code.get(rc))
case CONNECTIONLOSS:
checkMaster();
return;
case NONODE:
runForMaster();
return;
;
static void checkMaster()
zk.getData("/master", false, masterCheckCalllback, null);
static void runForMaster()
zk.create("/master", serverId.getBytes(), OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL, masterCreateCallback, null);
void startZK() throws IOException
zk = new ZooKeeper(hostPort, 15000, this);
public void process(WatchedEvent e)
System.out.println(e);
void stopZK() throws Exception
zk.close();
public static void main(String args[])
throws Exception
AsynACLMaster m = new AsynACLMaster("127.0.0.1:2181");
m.startZK();
m.runForMaster();
System.out.println("start success");
m.stopZK();
2.4 创建从结点
现在我们已经有了主节点,我们需要配置从节点,以便主节点可以 发号施令。每个从节点会在/workers下创建一个临时性的znode节点,很简单,通过以下代码就可以实现。我们将使用 znode节点中的数据,来指示从节点的状态:
public class CreateSlaveNodeWorker implements Watcher
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
ZooKeeper zk;
String hostPort;
String serverId;
String status;
CreateSlaveNodeWorker(String hostPort)
this.hostPort = hostPort;
Random random = new Random();
serverId = Integer.toHexString(random.nextInt());
void startZK() throws IOException
zk = new ZooKeeper(hostPort, 15000, this);
public void process(WatchedEvent e)
LOG.info(e.toString() + ", " + hostPort);
void register()
zk.create("/workers/worker" + serverId, "Idle".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, createWorkerCallback, null);
AsyncCallback.StringCallback createWorkerCallback = new AsyncCallback.StringCallback()
@Override
public void processResult(int rc, String path, Object ctx, String name)
switch (KeeperException.Code.get(rc))
case CONNECTIONLOSS:
register();
break;
case OK:
LOG.info("registed successs" + serverId);
break;
default:
LOG.info("registed failed " + KeeperException.create(KeeperException.Code.get(rc), path));
break;
;
public static void main(String[] args) throws Exception
CreateSlaveNodeWorker worker = new CreateSlaveNodeWorker("127.0.0.1:2181");
worker.startZK();
worker.register();
Thread.sleep(3000);
上面将从节点的状态信息存入代表从节点的znode节点中。如果进程死掉,我们希望能清理代表从节点的znode节点,因此这里还是将结点设置为EPHEMERAL类型,如果创建节点时连接丢失,进程会简单地重试创建过程。我们将从节点状态信息存入了代表从节点的znode节点,这样就可以通过查询ZooKeeper来获得从节点的状态。
2.5 任务队列化
系统最后的组件为Client应用程序队列化新任务,以便从节点执行这些任务,我们会在/tasks节点下添加子节点来表示从节点需要执行的命令。这里使用有序节点,这样做有两个好处,第一,序列号指定了任务被队列化的顺序;第二,可以通过很少的工作为任务创建基 于序列号的唯一路径。
Client1代码如下:
public class Client1 implements Watcher
ZooKeeper zk;
public Client1()
void startZK() throws Exception
zk = new ZooKeeper(HostPort.hostPort, 15000, this);
String queueCommand(String command)
while (true)
String name = null;
try
name = zk.create("/tasks/task-", command.getBytes(), OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
catch (KeeperException e)
e.printStackTrace();
catch (InterruptedException e)
e.printStackTrace();
return name;
@Override
public void process(WatchedEvent event)
System.out.println(event);
public static void main(String[] args) throws Exception
Client1 c = new Client1();
c.startZK();
String name = c.queueCommand("task");
System.out.println("Created " + name);
我们在/tasks节点下创建znode节点来标识一个任务,节点名称前缀为task-。因为我们使用的是CreateMode.EPHEMERAL_SEQUENTIAL模式的节点,task-后面会跟随一个单调递增的数字,这样就可以保证为每个任务创建的 znode节点的名称是唯一的,同时ZooKeeper会确定任务的顺序。
当我们运行Client应用程序并发送一个命令时,/tasks节点下就会创建一个新的znode节点,该节点也是临时性节点,因此Client1程序结束后,这个节点就不会存在。
2.6 管理客户端
最后,我们来写一个简单的AdminClient,通过该程序来展示系 统的运行状态。ZooKeeper优点之一是我们可以通过zkCli工具来查看系统的状态,但是通常希望编写自己的管理客户端,以便更快更简单地管理系统。在本例中,我们通过getData和getChildren方法来获得主从系统的运行状态。
这些方法的使用非常简单,因为这些方法不会改变系统的运行状态,我们仅需要简单地传播我们遇到的错误,而不需要进行任何清理操作。
该示例使用了同步调用的方法,这些方法还有一个watch参数,我 们置为false值,因为我们不需要监视变化情况,只是想获得系统当前的运行状态。在下一章中我们将会看到如何使用这个参数来跟踪系统的变化情况。现在,让我们看一下AdminClient的代码:
public class AdminClient implements Watcher
ZooKeeper zk;
void start() throws Exception
zk = new ZooKeeper(hostPort, 15000, this);
void listState() throws KeeperException, InterruptedException
try
Stat stat = new Stat();
byte masterData[] = zk.getData("/master", false, stat);//①
Date startDate = new Date(stat.getCtime());//②
System.out.println("Master: " + new String(masterData) + " since " + startDate);
catch (Exception e)
System.out.println("No Master");
System.out.println("Workers:");
for (String w : zk.getChildren("/workers", false))
byte data[] = zk.getData("/workers/" + w, false, null);//③
String state = new String(data);
System.out.println("\\t" + w + ": " + state);
System.out.println("Tasks:");
for (String t : zk.getChildren("/assign", false))
System.out.println("\\t" + t);
@Override
public void process(WatchedEvent event)
System.out.println(event);
public static void main(String[] args) throws Exception
AdminClient adminClient = new AdminClient();
adminClient.start();
adminClient.listState();
在上面的代码中,①处我们在/master节点中保存了主节点名称信息,因此我们从/master 中获取数据,以获得当前主节点的名称。因为我们并不关心变化情况,所以我们将第二个参数置为false。
②我们通过Stat结构,可以获得当前主节点成为主节点的时间信 息。ctime为该znode节点建立时的秒数(系统纪元以来的秒数,即自 1970年1月1日00:00:00UTC的描述)。
③临时节点含有两个信息:指示当前从节点正在运行;其数据表示 从节点的状态。
本文的AdminClient例子还是非常简单的,该程序简单地通过数据结构获得在主从示例的信息。我们试着可以启停Master程序、Worker程序,多次运行 Client来队列化一些任务,AdminClient会展示系统的这些变化的状态。
以上面的Master、Worker、Client这些的基本实现带领我们进入了 主从系统的开端,但到目前为止还没有实际调度起来。当一个任务加入队列,主节点需要唤醒并分配任务给一个从节点,从节点需要找出分配 给自己的任务,任务完成时,客户端需要及时知道,如果主节点故障, 另一个等待中的主节点需要接管主节点工作。如果从节点故障,分配给 这个从节点的任务需要分配给其他从节点,在下一章中,我们将会讨论 这些必要功能的实现。
以上是关于Zookeeper2.基于zk的开发入门的主要内容,如果未能解决你的问题,请参考以下文章
开源周荐分布式配置管理神器Qihoo360/QConf入门指北(部署配置使用架构原理)
开源周荐分布式配置管理神器Qihoo360/QConf入门指北(部署配置使用架构原理)