ZK节点间数据同步以及API实践

Posted 青山师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZK节点间数据同步以及API实践相关的知识,希望对你有一定的参考价值。

文章目录

ZK节点间数据同步以及API实践

Apache ZooKeeper(以下简称ZK)是一个分布式的协调服务,为分布式系统提供了一致性、可靠性和高性能。ZK可以用于实现诸如统一命名服务、配置管理、分布式锁等功能。其中,ZK集群在实现这些功能时起着至关重要的作用。

在ZK集群中,每个节点都保存有相同的数据副本。当某个节点的数据发生变化时,需要将这个变化同步给其他节点,以确保所有节点的数据始终保持一致。本文将介绍在ZK集群中节点间如何进行数据同步。

数据同步原理

在ZK集群中,节点通过Leader选举机制来选出一个Leader节点,由Leader节点进行数据同步控制。每个Follower节点都与Leader节点保持连接,在Leader节点接收到客户端的写请求时,会先将请求写入本地日志,然后向所有Follower节点发送该条写请求。Follower节点接收到写请求后也会将该请求写入本地日志。只有当大多数节点(即大多数副本)完成了该写请求操作并确认提交,Leader节点才会将该请求标记为已提交,并向所有Follower节点发送提交指令。Follower节点接收到该指令后,将在本地提交该写请求并返回成功响应给Leader节点。

当某个节点发生故障时,ZK会自动选举新的Leader节点,并将Leader节点的数据同步给新节点,以确保数据不丢失。

实践操作

下面我们通过实践来演示ZK集群中节点间数据同步的过程。我们将使用Java编写一个简单的程序,包含三个部分:创建节点、更新节点、删除节点。该程序将连接到ZK集群,进行数据操作,并输出相关的日志信息。

环境准备

在开始之前,需要先搭建好ZK集群环境。这里不再赘述,假设已经有了一个包含3个节点的ZK集群,并且它们都已经启动。

为了连接到ZK集群,我们需要引入以下依赖:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.3</version>
</dependency>

创建节点

我们首先编写一个程序,在ZK集群上创建一个节点,并输出日志信息。代码如下:

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class CreateNode 
    public static void main(String[] args) throws Exception 
        String connectString = "localhost:2181,localhost:2182,localhost:2183";
        int sessionTimeout = 5000;
        ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() 
            public void process(WatchedEvent event) 
                System.out.println("Received event: " + event);
            
        );
        String path = "/test";
        byte[] data = "hello world".getBytes();
        zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("Created node: " + path);
        zk.close();
    

我们在程序中指定了ZK集群的连接地址(localhost:2181,localhost:2182,localhost:2183)、会话超时时间(5000毫秒)、节点路径(/test)以及节点数据(hello world)。运行该程序,可以看到输出的日志信息:

Received event: WatchedEvent state:SyncConnected type:None path:null
Created node: /test

这表明程序已经成功连接到ZK集群,并且在节点/test下创建了一个名为test的节点,节点数据为hello world。

更新节点

接下来,我们编写一个程序,在ZK集群上更新一个节点,并输出日志信息。代码如下:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class UpdateNode 
    public static void main(String[] args) throws Exception 
        String connectString = "localhost:2181,localhost:2182,localhost:2183";
        int sessionTimeout = 5000;
        ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() 
            public void process(WatchedEvent event) 
                System.out.println("Received event: " + event);
            
        );
        String path = "/test";
        byte[] data = "hello zookeeper".getBytes();
        zk.setData(path, data, -1);
        System.out.println("Updated node: " + path);
        zk.close();
    

我们在程序中指定了ZK集群的连接地址、会话超时时间和节点路径(与上一步创建的节点一致)。运行该程序,可以看到输出的日志信息:

Received event: WatchedEvent state:SyncConnected type:None path:null
Received event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/test
Updated node: /test

这表明程序已经成功连接到ZK集群,并且将节点/test的数据由hello world更新为hello zookeeper。

删除节点

最后,我们编写一个程序,在ZK集群上删除一个节点,并输出日志信息。代码如下:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DeleteNode 
    public static void main(String[] args) throws Exception 
        String connectString = "localhost:2181,localhost:2182,localhost:2183";
        int sessionTimeout = 5000;
        ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() 
            public void process(WatchedEvent event) 
                System.out.println("Received event: " + event);
            
        );
        String path = "/test";
        zk.delete(path, -1);
        System.out.println("Deleted node: " + path);
        zk.close();
    

我们在程序中指定了ZK集群的连接地址、会话超时时间和节点路径(与上一步更新的节点一致)。运行该程序,可以看到输出的日志信息:

Received event: WatchedEvent state:SyncConnected type:None path:null
Received event: WatchedEvent state:SyncConnected type:NodeDeleted path:/test
Deleted node: /test

这表明程序已经成功连接到ZK集群,并且删除了节点/test。

总结

本文介绍了ZK集群中节点间数据同步的原理,并通过实践演示了如何使用Java编写程序,在ZK集群上进行节点的创建、更新和删除操作。在实际应用中,我们可以借助ZK提供的API,快速实现分布式系统的各种功能。同时,由于ZK使用的是副本机制,故障节点的自动恢复也可以确保数据的可靠性和一致性。

Zookeeper的基本原理(zk架构zk存储结构watch机制独立安装zk集群间同步复制)

1、Hbase集群的高可用性与伸缩性

  HBase可以实现对Regionserver的监控,当个别Regionserver不可访问时,将其负责的分区分给其他Regionsever,其转移过程较快,因为只需要将分区的相关信息转移。Hlog和表中数据实际存储在HDFS上,本身具有多副本机制容错。

  Master节点以及HDFS中的Namenode节点,如果只部署一个,可能造成单点故障,可以依托Zookeeper实现这两种关系主节点的高可用性配置。

Zookeeper实现的方法是:部署多个Master或Namenode,并区别为活跃节点和待命节点。当活跃节点发生故障后,待命节点自动提升为活跃节点。由Zookeeper负责监控活跃节点、选择新的活跃节点等功能。

  • 活跃节点:接收读写操作。
  • 待命节点:从活跃节点实现同步数据。

HBase还可以通过集群间的同步机制,实现(列族)数据的分布式热备份。

2、Zookpeeper的基本原理

场景:为了实现分布式服务中的数据同步与一致性、群组管理监控、分布投票协商与锁(如分布式事务中的二阶段提交)、命令寻址等,需要在集群中提供分布式协调服务。

Zookeeper:一款提供分布式协调服务的开源软件。Zookeeper以Fast Paxos算法为基础,提供分布式协调、选举和锁服务,并基于此扩展出配置维护、组服务、分布式消息队列、分布式通知/协调等功能。

2.1 Zookeeper架构

Zookeeper采用集群化部署方式,一般部署在多台服务器上,以防止单点失效。(一般采用奇数台服务器部署,目的是用于投票时收敛)

服务器可以看做是proposer和acceptor(Zookeeper称为follower)角色的综合。

  • 1.在服务器中选出leader进行提案和主持投票,如果leader失效,就重新选举。选举可以通过paxos或fastpaxos进行。
  • 2. 理论上全体服务器可以对提案投票,但为了控制网络开销,一般选择部分节点进行投票,而其他节点称为observer,只观察结果进行同步。

技术图片 

 PS:Zookeeper会在集群内同步数据,并通过所谓的watch机制实现对数据更新的监控,因此客户端可以连接到任何一台服务器来监测数据变化,以实现客户端集群内数据的最终一致性、原子性和顺序性等保障。

Zookeeper提供了分布式独享锁、选举、队列等API接口,可以实现配置同步、集群管理等协调性服务。

2.2 Znode存储结构

Zookeeper采用层次化目录结构存储数据。目录 ==节点,在zookeeper中称为Znode。

Znode的特点:

  • 一次写入、多次读取,数据写入后不可更改。
  • 可以存储多个版本的数据,以实现更新顺序性。
  • 一次性读取整个Znode,不支持部分读取。
  • 根据数据的生命周期,具有4种节点,在创建时确定并且不能再修改。
    • 临时节点(EPHEMERAL):不支持子节点,会在客户端会话结束时被删除。
    • 临时顺序节点(EPHEMERAL_SEQUENTIAL):临时节点,但父节点会为一级子节点记录创建时间,记录节点的创建顺序。
    • 持久节点(PERSISTENT):持久存储,一般根据客户端需求删除。
    • 持久顺序节点(PERSISTENT_SEQUENTIAL):持久节点,但父节点会为一级子节点记录创建时间,记录节点的创建顺序。

技术图片 

2.3 Watch机制

客户端可以通过watch机制关注Znode的信息变化,实现配置管理、数据同步和分布式锁等功能。

客户端首先需要注册一个watch,来观察某个Znode。当出现数据更新或被删除、子节点发生变化等情况时,Zookeeper集群会通过异步消息向客户端发送事件通知。通知发送后,该watcher就会失效,如果此时再发送信息变化,客户端就无法获取新的通知,除非客户端在进行新的注册。可见,watch机制能够确保消息的顺序性(旧消息被接收之前,客户端无法获得新消息)以及最终一致性,但无法确保所有的数据变化都能够被观察到。

2.4 Hadoop和HBase对Zookeeper的利用

Hadoop在默认情况下并不会使用Zookeeper。

场景:HDFS和MapReduce采用主从结构,由主节点进行集群管理,子节点之间不会自主进行协调、同步和监控等。这种结构使得子节点易于横向扩展,但主节点存在单点故障问题,即主节点宕机后,整个集群就失效了。Hadoop可以通过Zookeeper的协调能力实现主节点的高可用性,来解决这一个问题。

HBase中自带Zookeeper服务,但也可以禁用后选择外部独立安装的Zookeeper为其提供服务。HBase利用Zookeeper实现Regionserver监控、多Master高可用性管理以及META表入口存储等功能。

3、基于Zookeeper的高可用性

由于HDFS采用分布式部署和数据多副本机制,因此当出现少量Datanode故障或少量机架故障时,并不会出现数据损失或数据处理任务失败等情况。但Namenode角色在集群中只有一个,因此存在单点故障风险。HDFS的NamenodeHA(高可用性,High Available)方案:

技术图片

该方案中存在2个Namenode,其中Active Namenode向集群提供服务,Stanby Namenode为待命状态。Fsimage等信息存储在一个可共享的网络位置,Active Namenode写入数据,Stanby Namenode则只能读取,以防Fsimage出现多版本不一致的情况,即脑裂(brainsplit)。

Zookeeper分布式协调服务器可以监控Active Namenode的健康状态,当发送故障时,通过分布式选举机制将Stanby Namenode提升为Active Namenode,如果原Active Namenode从故障中恢复,则自动降级Stanby Namenode。整个过程自动进行,不需要人工干预。

HBase Hmaster 的高可用性原理与此类似,且在HBase中配置Master高可用性较为简单,只需要在各节点的配置目录下建立一个文本文件,命名为backup-masters,并在其中每行写入一个主机名,即可完成配置。

//当执行start-hbase.sh命令时,直接在作为Master节点的主机上
hbase-daemon.sh start master 

4、独立安装Zookeeper

Zookeeper的基本安装步骤:

  1. 软件部署
  2. 软件配置
  3. 启动与使用方法
  4. HBase使用独立的Zookeeper服务

4.1 软件部署

  独立安装Zookeeper是为向Hadoop和HBase共同提供服务的,Zookeeper对操作系统、软件和网络环境的要求与这些组件基本一致,能够安装Hadoop和HBase的节点也可以直接安装Zookeeper。下载Zookeeper之后,可以得到一个压缩包,将其解压到权限合适的目录即可完成基本部署。其中bin目录下为可执行命令,conf目录下为配置文件,lib目录下为对应的库包。

4.2 软件配置

  Zookeeper的配置文件为zoo.cfg,纯文本格式。

PS:三台服务器的zoo.cfg文件配置内容完全一致,但在每台服务器的conf下还需要各建立一个myid文件,该文件为纯文本文件,内容分别为1,2,3,对应下文配置的id与本机IP地址的关系。

//假设需要在三台服务器上部署Zookeeper,配置文件
tickTime =2000
dataDir = /zookeeper /data
dataLogDir = /zookeeper/logs
clientPort = 2181
initLimit = 5
syncLimit = 2
server.1=192.168.10.1:2888:3888
server.2=192.168.10.2:2888:3888
server.3=192.168.10.3:2888:3888

//tickTime=2000 配置内容为超时时间的基础值,单位为毫秒。
//initLimit=5  配置follower与leader之间建立连接后进行同步的最长时间,数值5表示实际时间为5*tickTime。
//syncLimit=5 为leader和follower之间等待回复消息的最大时间,数值2表示2*tickTime。
//dataDir=/tmp/zookeeper  表示zookeeper服务器存储快照文件的目录
//dataLogDir 表示配置 zookeeper事务日志的存储路径,默认指定在dataDir目录下 
//clientPort 表示客户端和服务端建立连接的端口号: 2181,为Znode的访问入口地址。
//server.id=host:port1:port2表示Zookeeper集群中有三个节点及其id、ip地址。port1是follower和leader交换消息的端口,port2表示follower选举leader使用的端口。 

4.3 启动与使用方法

//在各个节点的linux命令行依次执行启动:
zkServer.sh start
//jps 或zkServer.sh status 命令查看进程是否成功启动
zkServer.sh status
//进入zk命令行环境
zcCli.sh -server 192.168.10.1:2181
//help命令是查看命令列表
help
//ls <path> 指令,可以查看各个Znode信息
ls /hbase

4.4 HBase使用独立的Zookeeper服务

默认情况下,HBase利用自带的ZK提供分布式协调服务。默认使用自带ZK命令参数为:HBASE_MANAGES_ZK=true,随着HBase的启动而启动。

//手动控制其Zookeeper的启停
hbase-daemons sh {start,stop} zookeeper

 如果使用独立的ZK服务,则需要确保ZK集群先于HBase集群启动。如果有多个HBase集群共享一个Zookeeper服务,则需要在各自的配置文件中,配置不同的Znode入口,方法是hbase-site.xml中配置‘zookeeper.znode.parent‘为不同的路径。(默认值为/hbase)

5、集群间同步复制

  集群间同步(Replication机制),指同时配置两个HBase集群,其中一个为主集群,负责接收所有的写操作。从集群不断从主集群同步数据,但尽可读取。类似于关系型数据库中的读写分离。

  集群间同步采用主集群推送方式进行,推送的过程:主集群节点进行flush时,向从集群某个节点发送同步通知,从集群节点获取通知后,读取主集群相应节点上的WAL预写日志,在本地重建数据。

  主集群节点发送通知之后,会在Zookeeper中记录已备份WAL的偏移量,使得从集群可以确认备份的位置。如果主集群节点发现之前联系的从集群节点无响应,则会更换另一个从集群节点再次发出备份通知,并回滚WAL的偏移量记录信息。集群复制原理:

技术图片

    同步工作是异步进行的,即主从集群之间只能保证数据的最终一致性。

配置集群间复制的基本方法:

1、要确保两个集群之间的网络互通。
2、在从集群建立同名、同结构(具有相同列族)的表。
3、主集群的hbase-site.xml中配置hbase.replication参数为true,即设置为允许跨集群同步。
4、在主节点集群的HBase Shell中修改表结构。

disable Test //Test为表名
alter Test,{NAME =>basic,REPLICAITON_SCOPE => 1} //列族basic,集群复制的配置是针对列族的,REPLICATION_SCOPE属性不能在建表时设置,只能通过alter命令设置。
enable Test

5、在主集群节点HBase Shell中执行命令。

add_peer 1,"slave-zk-1,slave-zk-2,...:2181:/hbase"

  让主集群看到从集群的Zookeeper地址、端口、HBase使用的Znode入口(默认为/hbase)等。主从集群可以使用两套Zookeeper,也可以使用同一套Zookeeper,但使用同一套Zookeeper时,主从集群的Znode入口必须是不同的,即hbase-site.xml中zookeeper.znode.parent配置内容是不同的,一般体现在/hbase这一部分不同。
  使用编号1(peerid)可以用来完成复制同步的监控管理:

remove_peer 1
enable_peer 1
disable_peer 1

此外,通过list_peers命令,可以查看复制同步关系列表。

通过stop_replication和start_replication命令用来控制主集群上所有的复制同步启停。

6、HBase的扩展

HBase存在的问题:

  1. 分布式数据处理和统计问题
  2. 二级索引问题
  3. 时序数据存储问题
  4. 提供类似关系型数据库的功能和操作方式

6.1 协处理器机制(Coprocessor)

  协处理器是一个类似于MapReduce的并行处理组件,其基本思想:移动计算的代码远比移动数据低。通过把子任务(类似Map)代码分发到各个Regionserver上,让子任务独立地在各个服务器、表或分区上运行,来实现对数据的监控和操作。

  协处理器机制提供了一套编程框架,用户可以非常灵活地编写自动给你一的Coprocessor任务,并且用户还可以级联使用多个Corprocessor组件,完成更复杂的自定义功能。

  协处理器分别是Observer和Endpoint两种模式,Observer模式就如同关系型数据库中的触发器,Endpoint模式就如同关系型数据库中的存储过程。Observer可以分给三种类型,分别为RegionObserver、MasterObserver、WALObserver。其中Regionserver是region上的触发器,MasterObserver 是master服务器的触发器,而WALObserver用于预写日志的触发器。

应用协处理器机制可以极大地扩展HBase的能力,举例来说,用户可以通过三种思路,以自动开发协处理器的方式建立二级索引。

  • 1.基于WALObserver在一个索引表内生成索引,通过拦截预写日志的写入操作,把相应的键值对更新信息存储到索引表中。
  • 2.基于RegionObserver在同一个分区内维护一个索引列族,通过拦截分区的put、delete等操作,提取相应信息存储到同一个分区的索引列族中,这种方式的索引是局部索引,不支持全局排序。
  • 3.基于RegionObserver在一个索引表内生成索引,通过拦截put、delete等操作,提取相应数据更新信息存储到索引表中。

HBase中已经实现了集合函数组件、多行事务组件、多行条件删除组件等基于协处理器框架的组件。如phoenix、openTSDB等独立的HBase扩展软件也通过利用协处理器机制,实现更丰富的功能。

6.2 基于HBase的分布式处理

HBase只提供了数据管理和查询功能,如果对数据进行统计、聚合等操作则需要借助分布式处理架构。

大数据领域常见的分布式处理架构:MapReduce、Spark等的输入输出一般都是HDFS上的文件,但也可设置为从HBase表读取数据,或将数据写入HBase.

1.基于MapReduce的分布式处理

  HBase中的数据导入导出、行计数操作等都是调用MapReduce实现的。

  用户也可以自定义开发MapReduce程序,指示器连接HBase的Zookeeper地址,并获得目标表格和分区信息。通过在MapReduce代码中嵌入scan和put的方法,实现并行从HBase表中查询数据,或将数据并行写入到HBase表中。

2.基于Spark的分布式处理

  Spark是一种新兴的开源分布式处理框架,其性能优于MapReduce。

3.与Hive工具联合使用

  Hive是基于Hadoop系统的分布式数据仓库,能够提供增删改查等数据操作。它将结构化的数据文件映射为一张数据库表,并提供简单的类似SQL中语法的HiveQL语言进行数据查询。Hive通过对Hive QL语句进行语法分析、编译、优化、生成查询计划、最后大部分任务转换为MapReduce或Tez任务,小部分Hive QL语句直接进行处理,不转化为MapReduce。

  Hive和HBase可以很方便的结合使用,其方式是使用Hive自带的hive-hbase-handler组件把hive和hbase结合起来。将数据存储到HBase,在进行复杂数据处理时,通过使用hive对hive QL语句进行操作,把Hive QL命令转化为操作Hbase指令或MapReduce任务,实现数据的写入、查询或其他复杂操作。

技术图片

   Hive工具提供了类似HBase Shell的命令行环境,通过下面的命令可以在启动Hive的Shell环境时,建立和HBase主节点或Zookeeper集群中的META表的联系:

//在Hive的shell环境下执行Hive QL语句
hive --hiveconfhbase.master=nodel:16000 hive --hiveconfhbase.zookeeper.quorum=node1
CREATE TABLE thehivetable(key int,value string) STORED BY org.apache.hadoop.hive.hbase.HBaseStorageHandler  
WITH SERDEPROPERTIES ("hbase.columns.mapping"=":key,cf1:val") TBLPROPERTIES ("hbase.table.name" = "thehbasetable "); //建立一个Hive-HBase联合表,关键参数:STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler‘,指明了存储方式。
//该表在Hive中的表名为thehivetable,可以看做一个普通的面向行的表(即类似关系型数据库表)。具有key 和 value两个列,前者为整型,后者为字符串类型。
//hbase的表名为thehbasetable,将thehivetable表中的第一列映射为行键,第二列映射列族cf1中的列val
//在Hive的Shell,通过下面命令看到thehivetable表,描述其结构
showtables
describethehivetable

//在hive中删除表,hbase的表也会被删掉
droptablethehivetable

//在hbase的shell命令
list
desc thehbasetable

 

以上是关于ZK节点间数据同步以及API实践的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper的基本原理(zk架构zk存储结构watch机制独立安装zk集群间同步复制)

zk干货

zookeeper 伪分布安装

Zookeeper实践篇-Zookeeper经典场景实践

Zookeeper简介

使用linux脚本shell检查大数据各节点服务是否运行正常