Zookeeper模拟实现集群配置信息的订阅与发布(Hadoop)
Posted 木头㉿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper模拟实现集群配置信息的订阅与发布(Hadoop)相关的知识,希望对你有一定的参考价值。
一、项目背景
在集群配置管理中,在创痛的方式下,如果要修改集群中每个节点的配置信息,操作比较繁琐。首先需要修改相应的配置文件,然后逐步更新到集群中的各个节点。如果集群规模很大,比如有100个节点,那么修改后的配置文件需要逐步更新到这100个节点。这个过程不仅消耗时间,而且很容易遗漏某些节点,从而造成集群节点配置信息不一致的问题。
二、项目需求
在集群配置管理中,一般企业内部都会实现一套集中的配置管理中心,应对不同的应用集群对于共享各自的配置需求,并且在配置变更时能够通知到集群中的每个机器。
三、项目思路分析
为了实现配置信息的集中式管理和动态更新,采用发布/订阅模式将配置信息发布到Zookeeper节点上,供订阅者动态获取数据。为了模拟实现集群配置信息的订阅发布,具体实现思路如下所示:
1、首先需要启动Zookeeper服务,规划集群配置信息存放的节点/config。
2、然后通过ConfigWatcher类更新/config节点注册监视器watcher,监控集群配置信息变化。
3、最后通过ConfigUpdater类不断更新/config节点配置信息,从而模拟实现集群配置信息订阅发布效果。
三、代码实现
项目结构如下:
1、创建ConnectionWatcher类i连接Zookeeper
package com.dajiangtai.zookeeper.util;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* zookeeper连接
* @author dajiangtai
*
*/
public class ConnectionWatcher implements Watcher {
private static final int SESSION_TIMEOUT = 5000;
protected ZooKeeper zk;
private CountDownLatch _connectedSignal = new CountDownLatch(1);
public void connect(String hosts) throws IOException, InterruptedException {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
_connectedSignal.await();
}
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
_connectedSignal.countDown();
}
}
public void close() throws InterruptedException {
zk.close();
}
}
2、创建ActiveKeyValueStore类读写Zookeeper节点数据
package com.dajiangtai.zookeeper.util;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
/**
* 读写Zookeeper数据
* @author dajiangtai
*
*/
public class ActiveKeyValueStore extends ConnectionWatcher {
private static final Charset CHARSET = Charset.forName("UTF-8");
/**
* 注册永久节点
* @param path
* @param value
* @throws InterruptedException
* @throws KeeperException
*/
public void write(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
if(value == null){
zk.create(path, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}else{
zk.create(path, value.getBytes(CHARSET),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} else {
if(value == null){
zk.setData(path, null, -1);
}else{
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
}
/**
* 注册临时有序节点
* @param path
* @param value
* @throws InterruptedException
* @throws KeeperException
*/
public void writeEphemeralNode(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
if(value == null){
zk.create(path, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}else{
zk.create(path, value.getBytes(CHARSET),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
} else {
if(value == null){
zk.setData(path, null, -1);
}else{
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
}
/**
* 注册临时有序节点
* @param path
* @param value
* @throws InterruptedException
* @throws KeeperException
*/
public void writeEPHEMERAL_SEQUENTIAL_Node(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
if(value == null){
zk.create(path, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}else{
zk.create(path, value.getBytes(CHARSET),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
} else {
if(value == null){
zk.setData(path, null, -1);
}else{
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
}
/**
* 注册持久有序节点
* @param path
* @param value
* @throws InterruptedException
* @throws KeeperException
*/
public void writePERSISTENT_SEQUENTIAL_Node(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
if(value == null){
zk.create(path, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}else{
zk.create(path, value.getBytes(CHARSET),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}
} else {
if(value == null){
zk.setData(path, null, -1);
}else{
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
}
/**
* 判断节点是否存在
* @param path
* @param value
* @throws KeeperException
* @throws InterruptedException
*/
public void exists(String path, String value) throws KeeperException, InterruptedException{
Stat stat = zk.exists(path, false);
if (stat == null) {
if(value == null){
zk.create(path, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}else{
zk.create(path, value.getBytes(CHARSET),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
public void registerWatcher(String path ,Watcher watcher){
try {
zk.exists(path, watcher);
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void delete(String path){
try {
zk.delete(path, -1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 读取节点数据
* @param path
* @param watcher
* @return
* @throws InterruptedException
* @throws KeeperException
*/
public String read(String path, Watcher watcher)
throws InterruptedException, KeeperException {
byte[] data = zk.getData(path, watcher, null /* stat */);
return new String(data, CHARSET);
}
/**
* 获取所有子节点
* @param path
* @param watcher
* @return
* @throws InterruptedException
* @throws KeeperException
*/
public List<String> readChildren(String path, Watcher watcher)
throws InterruptedException, KeeperException {
List<String> childrens = null;
if(watcher == null){
childrens = zk.getChildren(path, false);
}else{
childrens = zk.getChildren(path, watcher, null);
}
return childrens;
}
}
3、创建ConfigUpdater类发布数据信息
package com.dajiangtai.zookeeper.configmanage;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.KeeperException;
import com.dajiangtai.zookeeper.util.ActiveKeyValueStore;
/**
* Updater 更新数据
* @author dajiangtai
*
*/
public class ConfigUpdater {
public static final String PATH = "/configuration";
private ActiveKeyValueStore _store;
private Random _random = new Random();
public ConfigUpdater(String hosts) throws IOException, InterruptedException {
_store = new ActiveKeyValueStore();//定义一个类
_store.connect(hosts);//连接Zookeeper
}
public void run() throws InterruptedException, KeeperException {
// noinspection InfiniteLoopStatement
while (true) {
String value = _random.nextInt(100) + "";
_store.write(PATH, value);//向znode写数据,也可以将xml文件写进去
System.out.printf("Set %s to %s\\n", PATH, value);
TimeUnit.SECONDS.sleep(_random.nextInt(10));
}
}
public static void main(String[] args) throws IOException,
InterruptedException, KeeperException {
String hosts="192.168.254.129:2181";
ConfigUpdater updater = new ConfigUpdater(hosts);
updater.run();
}
}
4、创建ConfigWatcher类订阅数据信息
package com.dajiangtai.zookeeper.configmanage;
import java.io.IOException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import com.dajiangtai.zookeeper.util.ActiveKeyValueStore;
/**
* Zookeeper-配置管理
* Watcher 监控
* @author dajiangtai
*
*/
public class ConfigWatcher implements Watcher {
private ActiveKeyValueStore _store;
public ConfigWatcher(String hosts) throws InterruptedException, IOException {
_store = new ActiveKeyValueStore();
//连接Zookeeper
_store.connect(hosts);
}
/**
* 读取znode节点数据
* @throws InterruptedException
* @throws KeeperException
*/
public void displayConfig() throws InterruptedException, KeeperException {
String value = _store.read(ConfigUpdater.PATH, this);
System.out.printf("Read %s as %s\\n", ConfigUpdater.PATH, value);
}
/**
* 监控znode数据变化
*/
public void process(WatchedEvent event) {
System.out.printf("Process incoming event: %s\\n", event.toString());
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
displayConfig();
} catch (InterruptedException e) {
System.err.println("Interrupted. Exiting");
Thread.currentThread().interrupt();
} catch (KeeperException e) {
System.err.printf("KeeperException: %s. Exiting.\\n", e);
}
}
}
public static void main(String[] args) throws IOException,
InterruptedException, KeeperException {
String hosts="192.168.254.129:2181";
//创建watcher
ConfigWatcher watcher = new ConfigWatcher(hosts);
//调用display方法
watcher.displayConfig();
//然后一直处于监控状态
Thread.sleep(Long.MAX_VALUE);
}
}
四、项目运行
项目代码编写完成之后,可以开始测试运行,模拟实现集群配置信息的订阅发布。
1、启动zookeeper
[hadoop@djt ~]$ cd app/zookeeper
[hadoop@djt zookeeper]$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/hadoop/app/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@djt zookeeper]$ jps
2504 Jps
2463 QuorumPeerMain
[hadoop@djt zookeeper]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/hadoop/app/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
[hadoop@djt zookeeper]$
2、订阅者订阅信息
3、客户端发布信息
在ConfigUpdater类中,执行main方法向/config节点循环发布信息,比如循环发布如下信息。
此时ConfigWatcher会实时监控/config节点信息的变化,及时从服务器端拉取新的信息如下所示。
通过ConfigUpdater发布的信息以及ConfigWatcher监控得到的信息可以看出,已经成功模拟实现集群配置信息的订阅发布。
以上是关于Zookeeper模拟实现集群配置信息的订阅与发布(Hadoop)的主要内容,如果未能解决你的问题,请参考以下文章