「大数据」(三十五)ZooKeeper之应用构建

Posted 数据仓库

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了「大数据」(三十五)ZooKeeper之应用构建相关的知识,希望对你有一定的参考价值。

【导读:数据是二十一世纪的石油,蕴含巨大价值,这是·情报通·大数据技术系列第[35]篇文章,欢迎阅读和收藏】

基本概念

Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目, ZooKeeper 框架支持许多当今最好的工业应用程序,且在越来越多的分布式系统( Hadoop 、 HBase 、 Solr )中, Zookeeper 都作为核心组件使用。它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:分布式命名服务、配置管理、负载均衡、集群管理、分布式锁、分布式队列管理等。

配置管理是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信息中那些公共的部分。简单地说, ZooKeeper 可以作为一个具有高可用性的配置存储器,允许分布式应用的参与者检索和更新配置文件。使用 ZooKeeper 中的观察机制,可以建立一个活跃的配置服务,使那些感兴趣的客户端能够获得配置信息修改的通知。

把这些配置全部放到 zookeeper 上,保存在 Zookeeper 的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 Zookeeper 的通知,然后从 Zookeeper 获取新的配置信息应用到系统中就好。

术语解释

ZooKeeper :是一个分布式的,开放源码的分布式应用程序协调服务。它是以 Fast Paxos 算法为基础,实现同步服务,配置维护和命名服务等分布式应用。简单来说, ZooKeeper= 文件管理 + 通知机制。

Hadoop :Hadoop 的核心是 HDFS ( Hadoop Distributed File System )和 MapReduce ,分别提供了对海量数据的存储和计算能力。Hadoop 依靠 ZooKeeper 进行配置管理和协调。

HBase :是一个开源的,分布式的 NoSQL 数据库,用于大型数据集的实时读 / 写访问。Apache HBase 使用 ZooKeeper 通过集中式配置管理和分布式互斥机制来帮助主机和区域服务器跟踪分布式数据的状态。

Solr :是一个用 Java 编写的快速,开源的搜索平台。Solr 广泛使用 ZooKeeper 的每个功能,如配置管理, leader 选举,节点管理,数据锁定和同步化。

详细说明

下面基于 Zookeeper 来实现统一配置管理。

解决思路:

)把公共配置抽取出来

)对公共配置进行维护

)修改公共配置后应用不需要重新部署

采用方案:

)公共配置抽取存放于 zookeeper 中并落地数据库

)对公共配置修改后发布到 zookeeper 中并落地数据库

)对应用开启配置实时监听, zookeeper 配置文件一旦被修改,应用可实时监听到并获取

需要用到的 jar 是 zkclient

配置文件 Config

package com.cwh.zk.util;

import java.io.Serializable;

public class Config implements Serializable{

private static final long serialVersionUID = 1L;

private String userNm;

private String userPw;

public Config() {

}

public Config(String userNm, String userPw) {

this.userNm = userNm;

this.userPw = userPw;

}

public String getUserNm() {

return userNm;

}

public void setUserNm(String userNm) {

this.userNm = userNm;

}

public String getUserPw() {

return userPw;

}

public void setUserPw(String userPw) {

this.userPw = userPw;

}

@Override

public String toString() {

return "Config [userNm=" + userNm + ", userPw=" + userPw + "]";

}

}

配置管理中心 ZkConfigMag

package com.cwh.zk.util;

import org.I0Itec.zkclient.ZkClient;

public class ZkConfigMag {

private Config config;

/**

从数据库加载配置

*/

public Config downLoadConfigFromDB(){

//getDB

config = new Config("nm", "pw");

return config;

}

/**

配置文件上传到数据库

*/

public void upLoadConfigToDB(String nm, String pw){

if(config==null)config = new Config();

config.setUserNm(nm);

config.setUserPw(pw);

//updateDB

}

/**

配置文件同步到 zookeeper

*/

public void syncConfigToZk(){

ZkClient zk = new ZkClient("localhost:2181");

if(!zk.exists("/zkConfig")){

zk.createPersistent("/zkConfig",true);

}

zk.writeData("/zkConfig", config);

zk.close();

}

}

应用监听实现 ZkGetConfigClient

package com.cwh.zk.util;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

public class ZkGetConfigClient {

private Config config;

public Config getConfig() {

ZkClient zk = new ZkClient("localhost:2181");

config = (Config)zk.readData("/zkConfig");

System.out.println(" 加载到配置: "+config.toString());

// 监听配置文件修改

zk.subscribeDataChanges("/zkConfig", new IZkDataListener(){

@Override

public void handleDataChange(String arg0, Object arg1)

throws Exception {

config = (Config) arg1;

System.out.println(" 监听到配置文件被修改: "+config.toString());

}

@Override

public void handleDataDeleted(String arg0) throws Exception {

config = null;

System.out.println(" 监听到配置文件被删除 ");

}

});

return config;

}

public static void main(String[] args) {

ZkGetConfigClient client = new ZkGetConfigClient();

client.getConfig();

System.out.println(client.config.toString());

for(int i = 0;i<10;i++){

System.out.println(client.config.toString());

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

测试,启动配置管理中心

package com.cwh.zkConfig.test;

import com.cwh.zk.util.Config;

import com.cwh.zk.util.ZkConfigMag;

public class ZkConfigTest {

public static void main(String[] args) {

ZkConfigMag mag = new ZkConfigMag();

Config config = mag.downLoadConfigFromDB();

System.out.println(".... 加载数据库配置 ...."+config.toString());

mag.syncConfigToZk();

System.out.println(".... 同步配置文件到 zookeeper....");

// 歇会,这样看比较清晰

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

mag.upLoadConfigToDB("cwhcc", "passwordcc");

System.out.println(".... 修改配置文件 ...."+config.toString());

mag.syncConfigToZk();

System.out.println(".... 同步配置文件到 zookeeper....");

}

}

以上是关于「大数据」(三十五)ZooKeeper之应用构建的主要内容,如果未能解决你的问题,请参考以下文章

「大数据」(三十四)ZooKeeper之服务简介

打怪升级之小白的大数据之旅(三十五)<JDBC的扩展知识点>

「大数据」(三十三)Zookeeper之选举机制

DT大数据梦工厂第三十五课 Spark系统运行循环流程

大数据框架开发基础之Zookeeper入门

2021年大数据Flink(三十五):​​​​​​​Table与SQL ​​​​​​案例二