JAVA 连接 ZooKeeper之初体验
Posted 为你撑起一片天
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA 连接 ZooKeeper之初体验相关的知识,希望对你有一定的参考价值。
Java连接Zookeeper
一、配置zk环境
- 本人使用的是虚拟机,创建了两台linux服务器(安装过程百度上很多)
- 准备zk的安装包,zookeeper-3.4.10.tar.gz,可在Apache官网下载,这里我提供了一个百度云的https://pan.baidu.com/s/15icVROSKpgwUzqzpHW6Rbg 密码dgnp
- 安装过程
- 环境准备:安装JDK,配置Hosts,配置Hostname、关闭防火墙
- $ZOOKEEPER = /home/zookeeper_application/zookeeper-3.4.10
- tar -xvzf zookeeper-3.4.10.tar.gz目录为/home/zookeeper_application/zookeeper-3.4.10
- cd 到zookeeper-3.4.10,mkdir data 创建data文件夹,cd 到data,touch myid 创建myid文件,vi myid myid存server的id,可以是1,2,3…,必须唯一
- 创建$ZOOKEEPER/conf/zoo.cfg
修改zoo.cfg,增加如下: dataDir=/home/zookeeper_application/zookeeper-3.4.10/data
clientPort=2181
initLimit=10
syncLimit=5
tickTime=2000
server.1=master:2888:3888
server.2=slave1:2888:3888- 其中server.X代表组成整个服务的机器,当服务启动时,会在数据目录下查找这个文件myid,这个文件中存有服务器的号码。
- 配置/etc/profile:
- export ZOOKEEPER_HOME=/home/zookeeper_application/zookeeper-3.4.10
- export PATH=$PATH:/usr/local/jdk1.7/bin:$ZOOKEEPER_HOME/bin
- 同样配置另外一台服务器,myid为2
- 启动/状态/停止 在bin目录下 zkServer.sh start|status|stop
java部分
创建zk.properties
zk.server=192.168.2.111:2181
zk.authentication_type=digest
zk.correct_authentication=rightKey
zk.bad_authentication=worngKey
ZkConfig.java
package org.seckill.zk.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* @author mxn
* @create 2018-07-16 17:22
*/
public class ZkConfig {
public static final Logger logger = LoggerFactory.getLogger(ZkConfig.class);
public static final String FILE_NAME = "zk.properties";
private String server;
private String authenticationType;
private String correctAuthentication;
private String badAuthentication;
/**
* 配置文件中的常量.
*/
public static final String zk_server = "zk.server";
public static final String zk_authentication_type = "zk.authentication_type";
public static final String zk_correct_authentication = "zk.correct_authentication";
public static final String zk_bad_authentication = "zk.bad_authentication";
/**
* 操作对象.
*/
private static ZkConfig config = new ZkConfig();
private ZkConfig() {
super();
}
private Properties properties;
/**
* 获取config对象.
*
* @return
*/
public static ZkConfig getConfig() {
return config;
}
public static void main(String[] args) {
System.out.println(1);
}
static {
config.loadPropertiesFromSrc();
}
public void loadPropertiesFromSrc() {
InputStream in = null;
try {
in = ZkConfig.class.getClassLoader().getResourceAsStream(FILE_NAME);
if (null != in) {
properties = new Properties();
try {
properties.load(in);
loadProperties(properties);
} catch (IOException e) {
throw e;
}
} else {
logger.error(FILE_NAME + "属性文件未能在classpath指定的目录下 " + ZkConfig.class.getClassLoader().getResource("").getPath() + " 找到!");
}
} catch (Exception e) {
logger.error("ZkConfig.getZkConfig error:", e);
} finally {
if (null != in) {
try {
in.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
/**
* 根据传入的 {@link # load(Properties)}对象设置配置参数
*
* @param pro
*/
public void loadProperties(Properties pro) {
logger.info("开始从属性文件中加载配置项");
String value = null;
value = pro.getProperty(zk_server);
if (!ZkUtil.isEmpty(value)) {
this.server = value.trim();
logger.info("配置项:服务器ip==>" + zk_server + "==>" + value + " 已加载");
}
value = pro.getProperty(zk_authentication_type);
if (!ZkUtil.isEmpty(value)) {
this.authenticationType = value.trim();
logger.info("配置项:验证类型==>" + zk_authentication_type + " 已加载");
}
value = pro.getProperty(zk_correct_authentication);
if (!ZkUtil.isEmpty(value)) {
this.correctAuthentication = value.trim();
logger.info("配置项:正确的密码==>" + zk_correct_authentication + " 已加载");
}
value = pro.getProperty(zk_bad_authentication);
if (!ZkUtil.isEmpty(value)) {
this.badAuthentication = value.trim();
logger.info("配置项:错误的密码==>" + zk_bad_authentication + " 已加载");
}
}
public static Logger getLogger() {
return logger;
}
public static String getFileName() {
return FILE_NAME;
}
public String getServer() {
return server;
}
public void setServer(String server) {
this.server = server;
}
public String getAuthenticationType() {
return authenticationType;
}
public void setAuthenticationType(String authenticationType) {
this.authenticationType = authenticationType;
}
public String getCorrectAuthentication() {
return correctAuthentication;
}
public void setCorrectAuthentication(String correctAuthentication) {
this.correctAuthentication = correctAuthentication;
}
public String getBadAuthentication() {
return badAuthentication;
}
public void setBadAuthentication(String badAuthentication) {
this.badAuthentication = badAuthentication;
}
public static String getZk_server() {
return zk_server;
}
public static String getZk_authentication_type() {
return zk_authentication_type;
}
public static String getZk_correct_authentication() {
return zk_correct_authentication;
}
public static String getZk_bad_authentication() {
return zk_bad_authentication;
}
public static void setConfig(ZkConfig config) {
ZkConfig.config = config;
}
}
ZkUtil.java 一个工具类
package org.seckill.zk.util; /** * @author 12084 * @create 2018-07-17 9:09 */ public class ZkUtil { /** * 判断字符串是否为NULL或空 * * @param s * 待判断的字符串数据 * @return 判断结果 true-是 false-否 */ public static boolean isEmpty(String s) { return null == s || "".equals(s.trim()); } }
ZkAuth.java
package org.seckill.zk;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.seckill.zk.util.ZkConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicInteger; /** * @author 12084 * @create 2018-07-16 15:59 */ public class ZkAuth implements Watcher { public static final Logger logger = LoggerFactory.getLogger(ZkAuth.class); final static String PATH = "/auth_test"; final static String PATH_DEL = "/auth_test/del_node"; static ZooKeeper zk = null; AtomicInteger seq = new AtomicInteger(); public void process(WatchedEvent event) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } if (event == null) { return; } //连接状态 Event.KeeperState keeperState = event.getState(); //事件类型 Event.EventType eventType = event.getType(); String logPrefix = "【Watcher-" + seq.incrementAndGet() + "】"; logger.info(logPrefix + "收到Watcher通知"); logger.info(logPrefix + "连接状态: " + keeperState.toString()); logger.info(logPrefix + "事件类型: " + eventType.toString()); if (Event.KeeperState.SyncConnected == keeperState) { //成功连接上ZK服务器 if (Event.EventType.None == eventType) { logger.info(logPrefix + "成功连接上ZK服务器"); } } else if (Event.KeeperState.Disconnected == keeperState) { logger.info(logPrefix + "与ZK服务器断开连接"); } else if (Event.KeeperState.AuthFailed == keeperState) { logger.info(logPrefix + "权限检查失败"); } else if (Event.KeeperState.Expired == keeperState) { logger.info(logPrefix + "会话失效"); } logger.info("--------------------------------------------"); } /** * 创建ZK连接 * * @param connectString ZK服务器地址列表 * @param sessionTimeout Session超时时间 */ public void createConnection(String connectString, int sessionTimeout) { releaseConnection(); try { zk = new ZooKeeper(connectString, sessionTimeout, this); //授权 zk.addAuthInfo(ZkConfig.getConfig().getAuthenticationType(), ZkConfig.getConfig().getCorrectAuthentication().getBytes()); logger.info("开始连接ZK服务器...."); while (zk.getState() != ZooKeeper.States.CONNECTED) { Thread.sleep(3000); } } catch (Exception e) { e.printStackTrace(); } } /** * 关闭ZK连接 */ public void releaseConnection() { if (zk != null) { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 获取数据:不采用密码 */ static void getDataByNoAuthentication() { String prefix = "[不使用任何授权信息]"; logger.info(prefix + "获取数据:" + PATH); try { ZooKeeper noZk = new ZooKeeper(ZkConfig.getConfig().getServer(), 2000, null); Thread.sleep(4000); logger.info(prefix + "成功获取数据:" + noZk.getData(PATH, false, null)); } catch (Exception e) { logger.error(prefix + "获取数据失败,原因:" + e.getMessage()); } } /** * 获取数据:采用错误的密码 */ static void getDataByBadAuthentication() { String prefix = "[使用错误的授权信息]"; try { ZooKeeper badzk = new ZooKeeper(ZkConfig.getConfig().getServer(), 2000, null); //授权 Thread.sleep(4000); badzk.addAuthInfo(ZkConfig.getConfig().getAuthenticationType(), ZkConfig.getConfig().getBadAuthentication().getBytes()); logger.info(prefix + "获取数据:" + PATH); logger.info(prefix + "成功获取数据:" + badzk.getData(PATH, false, null)); } catch (Exception e) { logger.error(prefix + "获取数据失败,原因:" + e.getMessage()); } } /** * 采用正确的密码 */ static void getDataByCorrectAuthentication() { String prefix = "[使用正确的授权信息]"; try { logger.info(prefix + "获取数据:" + PATH); logger.warn(prefix + "成功获取数据:" + new String(zk.getData(PATH, false, null))); } catch (Exception e) { logger.error(prefix + "获取数据失败,原因:" + e.getMessage()); } } /** * 更新数据:不采用密码 */ static void updateDataByNoAuthentication() { String prefix = "[不使用任何授权信息]"; logger.info(prefix + "更新数据:" + PATH); try { ZooKeeper nozk = new ZooKeeper(ZkConfig.getConfig().getServer(), 2000, null); Thread.sleep(4000); Stat stat = nozk.exists(PATH, false); if (stat != null) { nozk.setData(PATH, prefix.getBytes(), -1); logger.info(prefix + "更新成功"); } } catch (Exception e) { logger.error(prefix + "更新失败,原因是:" + e.getMessage()); } } /** * 更新数据:采用错误的密码 */ static void updateDataByBadAuthentication() { String prefix = "[使用错误的授权信息]"; logger.info(prefix + "更新数据:" + PATH); try { ZooKeeper badzk = new ZooKeeper(ZkConfig.getConfig().getServer(), 2000, null); //授权 badzk.addAuthInfo(ZkConfig.getConfig().getAuthenticationType(), ZkConfig.getConfig().getCorrectAuthentication().getBytes()); Stat exists = badzk.exists(PATH, false); if (exists != null) { badzk.setData(PATH, prefix.getBytes(), -1); logger.info(prefix + "更新成功"); } } catch (Exception e) { logger.error(prefix + "更新失败,原因是:" + e.getMessage()); } } /** * 更新数据:采用正确的密码 */ static void updateDataByCorrectAuthentication() { String prefix = "[使用正确的授权信息]"; logger.info(prefix + "更新数据:" + PATH); try { Stat stat = zk.exists(PATH, false); if (stat != null) { zk.setData(PATH, prefix.getBytes(), -1); logger.info(prefix + "更新成功"); } } catch (Exception e) { logger.error(prefix + "更新失败,原因是:" + e.getMessage()); } } static void deleteParent() throws Exception { Stat stat = zk.exists(PATH_DEL, false); if (stat != null) { zk.delete(PATH_DEL, -1); zk.delete(PATH, -1); return; } Stat stat2 = zk.exists(PATH, false); if (stat2 != null) { zk.delete(PATH, -1); } } public static void main(String[] args) { ZkAuth zkAuth = null; try { zkAuth = new ZkAuth(); zkAuth.createConnection(ZkConfig.getConfig().getServer(), 5000); deleteParent(); zk.create(PATH, "init content".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); // zk.create(PATH_DEL, "will be deleted!".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); logger.info("客户端开始访问-----------------------------------------------"); //获取数据 getDataByNoAuthentication(); getDataByBadAuthentication(); getDataByCorrectAuthentication(); } catch (Exception e) { e.printStackTrace(); } finally { //释放连接 zkAuth.releaseConnection(); } } }
- 运行结果
- 启动两台linux服务器
- 启动zk
- 另外一台服务器同样操作
- 运行java代码
- 运行日志:
Connected to the target VM, address: ‘127.0.0.1:52280‘, transport: ‘socket‘
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/Maven/repo/ch/qos/logback/logback-classic/1.1.1/logback-classic-1.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/Maven/repo/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
13:56:20,355 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
13:56:20,356 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
13:56:20,370 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/D:/workspace/maven/seckill/target/classes/logback.xml]
13:56:20,527 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
13:56:20,531 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
13:56:20,559 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
13:56:20,617 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to DEBUG
13:56:20,617 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
13:56:20,618 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
13:56:20,620 |-INFO in [email protected] - Registering current configuration as safe fallback point
13:56:20.626 [main] INFO org.seckill.zk.util.ZkConfig - 开始从属性文件中加载配置项
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:服务器ip==>zk.server==>192.168.2.111:2181 已加载
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:验证类型==>zk.authentication_type 已加载
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:正确的密码==>zk.correct_authentication 已加载
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:错误的密码==>zk.bad_authentication 已加载
13:56:20.645 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
13:56:20.645 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=windows10.microdone.cn
13:56:20.645 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_11
13:56:20.646 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
13:56:20.646 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=C:Program FilesJavajdk1.8.0_11jre
ar;D:Maven epoorgslf4jslf4j-api1.7.12slf4j-api-1.7.12.jar;D:Maven epochqoslogbacklogback-core1.1.1logback-core-1.1.1.jar;D:Maven epochqoslogbacklogback-classic1.1.1logback-classic-1.1.1.jar;D:Maven epomysqlmysql-connector-java5.1.35mysql-connector-java-5.1.35.jar;D:Maven epoc3p0c3p0 .9.1.2c3p0-0.9.1.2.jar;D:Maven epoorgmybatismybatis3.3.0mybatis-3.3.0.jar;D:Maven epoorgmybatismybatis-spring1.2.3mybatis-spring-1.2.3.jar;D:Maven epo aglibsstandard1.1.2standard-1.1.2.jar;D:Maven epojstljstl1.2jstl-1.2.jar;D:Maven epocomfasterxmljacksoncorejackson-databind2.5.4jackson-databind-2.5.4.jar;D:Maven epocomfasterxmljacksoncorejackson-annotations2.5.0jackson-annotations-2.5.0.jar;D:Maven epocomfasterxmljacksoncorejackson-core2.5.4jackson-core-2.5.4.jar;D:Maven epojavaxservletjavax.servlet-api3.1.0javax.servlet-api-3.1.0.jar;D:Maven epoorgspringframeworkspring-core4.1.7.RELEASEspring-core-4.1.7.RELEASE.jar;D:Maven epocommons-loggingcommons-logging1.2commons-logging-1.2.jar;D:Maven epoorgspringframeworkspring-beans4.1.7.RELEASEspring-beans-4.1.7.RELEASE.jar;D:Maven epoorgspringframeworkspring-context4.1.7.RELEASEspring-context-4.1.7.RELEASE.jar;D:Maven epoorgspringframeworkspring-aop4.1.7.RELEASEspring-aop-4.1.7.RELEASE.jar;D:Maven epoaopallianceaopalliance1.0aopalliance-1.0.jar;D:Maven epoorgspringframeworkspring-expression4.1.7.RELEASEspring-expression-4.1.7.RELEASE.jar;D:Maven epoorgspringframeworkspring-jdbc4.1.7.RELEASEspring-jdbc-4.1.7.RELEASE.jar;D:Maven epoorgspringframeworkspring-tx4.1.7.RELEASEspring-tx-4.1.7.RELEASE.jar;D:Maven epoorgspringframeworkspring-web4.1.7.RELEASEspring-web-4.1.7.RELEASE.jar;D:Maven epoorgspringframeworkspring-webmvc4.1.7.RELEASEspring-webmvc-4.1.7.RELEASE.jar;D:Maven epoorgspringframeworkspring-test4.1.7.RELEASEspring-test-4.1.7.RELEASE.jar;D:Maven epo edisclientsjedis2.7.3jedis-2.7.3.jar;D:Maven epoorgapachecommonscommons-pool22.3commons-pool2-2.3.jar;D:Maven epocomdyuprojectprotostuffprotostuff-core1.0.8protostuff-core-1.0.8.jar;D:Maven epocomdyuprojectprotostuffprotostuff-api1.0.8protostuff-api-1.0.8.jar;D:Maven epocomdyuprojectprotostuffprotostuff-runtime1.0.8protostuff-runtime-1.0.8.jar;D:Maven epocomdyuprojectprotostuffprotostuff-collectionschema1.0.8protostuff-collectionschema-1.0.8.jar;D:Maven epocommons-collectionscommons-collections3.2commons-collections-3.2.jar;D:Maven epoorgapachezookeeperzookeeper3.4.6zookeeper-3.4.6.jar;D:Maven epoorgslf4jslf4j-log4j121.6.1slf4j-log4j12-1.6.1.jar;D:Maven epolog4jlog4j1.2.16log4j-1.2.16.jar;D:Maven epoio etty etty3.7.0.Final etty-3.7.0.Final.jar;D:Maven epojlinejline .9.94jline-0.9.94.jar;D:installIdeaIntelliJ IDEA 2018.1.1libidea_rt.jar;C:Users12084AppDataLocalTempcapture1jarsdebugger-agent.jar
13:56:20.648 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=C:Program FilesJavajdk1.8.0_11in;C:WINDOWSSunJavain;C:WINDOWSsystem32;C:WINDOWS;C:ProgramDataOracleJavajavapath;C:WINDOWSsystem32;C:WINDOWS;C:WINDOWSSystem32Wbem;C:WINDOWSSystem32WindowsPowerShellv1.0;C:Program FilesTortoiseSVNin;C:Program FilesSlikSvnin;D:Mavenapache-maven-3.3.9in;D:installMySQLMySQL Server 5.1in;C:Program FilesJavajdk1.8.0_11in;D:GitGitcmd;C:WINDOWSSystem32OpenSSH;C:Users12084AppDataLocalMicrosoftWindowsApps;%USERPROFILE%AppDataLocalMicrosoftWindowsApps;;.
13:56:20.648 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=C:Users12084AppDataLocalTemp 13:56:20.648 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=Process finished with exit code 0
- 启动zk
- 详情
13:56:20.626 [main] INFO org.seckill.zk.util.ZkConfig - 开始从属性文件中加载配置项
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:服务器ip==>zk.server==>192.168.2.111:2181 已加载
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:验证类型==>zk.authentication_type 已加载
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:正确的密码==>zk.correct_authentication 已加载
13:56:20.634 [main] INFO org.seckill.zk.util.ZkConfig - 配置项:错误的密码==>zk.bad_authentication 已加载
13:56:20.680 [main] INFO org.seckill.zk.ZkAuth - 开始连接ZK服务器....
13:56:20.910 [main-EventThread] INFO org.seckill.zk.ZkAuth - 【Watcher-1】收到Watcher通知
13:56:20.911 [main-EventThread] INFO org.seckill.zk.ZkAuth - 【Watcher-1】连接状态: SyncConnected
13:56:20.911 [main-EventThread] INFO org.seckill.zk.ZkAuth - 【Watcher-1】事件类型: None
13:56:20.911 [main-EventThread] INFO org.seckill.zk.ZkAuth - 【Watcher-1】成功连接上ZK服务器
13:56:23.714 [main] INFO org.seckill.zk.ZkAuth - 客户端开始访问-----------------------------------------------
13:56:23.715 [main] INFO org.seckill.zk.ZkAuth - [不使用任何授权信息]获取数据:/auth_test
13:56:27.734 [main] ERROR org.seckill.zk.ZkAuth - [不使用任何授权信息]获取数据失败,原因:KeeperErrorCode = NoAuth for /auth_test
13:56:31.738 [main] ERROR org.seckill.zk.ZkAuth - [使用错误的授权信息]获取数据失败,原因:KeeperErrorCode = NoAuth for /auth_test
13:56:31.754 [main] WARN org.seckill.zk.ZkAuth - [使用正确的授权信息]成功获取数据:init content
以上是关于JAVA 连接 ZooKeeper之初体验的主要内容,如果未能解决你的问题,请参考以下文章