HBase详解
Posted 安小猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase详解相关的知识,希望对你有一定的参考价值。
文章目录
HBase简介
HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、实时读写的分布式数据库
利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务,主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)。
HBase特点
-
大:一个表可以有上亿行,上百万列。
-
面向列:面向列表(簇)的存储和权限控制,列(簇)独立检索。
-
稀疏:对于为空(NULL)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
-
无模式:每一行都有一个可以排序的主键和任意多的列,列可以根据需要动态增 加,同一张表中不同的行可以有截然不同的列。
-
数据多版本:每个单元中的数据可以有多个版本,默认情况下,版本号自动分配, 版本号就是单元格插入时的时间戳。
-
数据类型单一:HBase中的数据都是字节数组,没有类型。
HBase架构
Master
- 为Region server分配region
- 负责Region server的负载均衡
- 发现失效的Region server并重新分配其上的region
- 管理用户对table的增删改操作
RegionServer
- Region server维护region,处理对这些region的IO请求
- Region server负责切分在运行过程中变得过大的region
Region
-
HBase自动把表水平划分成多个区域(region),每个region会保存一个表里面某段连续的数据;每个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region(裂变)。
-
当table中的行不断增多,就会有越来越多的region。这样一张完整的表被保存在多个Regionserver 上。
Memstore&Storefile
-
一个region由多个store组成,一个store对应一个CF(列族)store包括位于内存中的memstore和位于磁盘的storefile写操作先写入memstore,当memstore中的数据达到某个阈值,hregionserver会启动flashcache进程写入storefile,每次写入形成单独的一个storefile
-
当storefile文件的数量增长到一定阈值后,系统会进行合并(minor、major compaction),在合并过程中会进行版本合并和删除工作(majar),形成更大的storefile
-
当一个region所有storefile的大小和数量超过一定阈值后,会把当前的region分割为两个,并由hmaster分配到相应的regionserver服务器,实现负载均衡
-
客户端检索数据,先在memstore找,找不到再找storefile
HLog
-
HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是”写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。
-
HLog SequeceFile的Value是HBase的KeyValue对象,即对应HFile中的KeyValue。
HBase数据模型
RowKey(行键)
-
唯一标识一行数据
-
可以通过RowKey获取一行数据
-
按照字典顺序排序的。
-
Row key只能存储64k的字节数据 10-100byte
ColumnFamily&Qualifier(列簇和列)
-
HBase表中的每个列都归属于某个列族,列族必须作为表模式(schema)定义的一部分预先给出。如 create ‘test’, ‘course’。
-
列名以列族作为前缀,每个“列族”都可以有多个列成员(column);如course:math, course:english, 新的列族成员(列)可以随后按需、动态加入。
-
权限控制、存储以及调优都是在列族层面进行的;
-
HBase把同一列族里面的数据存储在同一目录下,由几个文件保存。
TimeStamp(时间戳)
-
在HBase每个cell存储单元对同一份数据有多个版本,根据唯一的时间戳来区分每个版本之间的差异,不同版本的数据按照时间倒序排序,最新的数据版本排在最前面。
-
时间戳的类型是 64位整型。
-
时间戳可以由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间
-
时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。
Cell(存储单元)
-
由行和列的坐标交叉决定。
-
单元格是有版本的。
-
单元格的内容是未解析的字节数组。
-
由{row key, column( = +), version} 唯一确定的单元。cell中的数据是没有类型的,全部是字节码形式存贮。
HBase读写流程
HBase Shell
HBaseAPI
常用java类
java类 | HBase数据模型 |
---|---|
Admin / HBaseAdmin/ HBaseConfiguration | 数据库 |
HTable/HTableDescriptor | 表 |
HColumnDescriptor | 列簇 |
Put/Delete/Get/Scan/ResultScanner/ | 列 |
CellUtil | 存储单元 |
示例代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class Demo2API {
Configuration conf = null;
Connection conn = null;
@Before
public void init() {
conf = HBaseConfiguration.create();
// 其他配置参考http://hbase.apache.org/1.4/book.html#config.files
conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
try {
conn = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
// 创建表
@Test
public void create_table() throws IOException {
Admin admin = conn.getAdmin();
// admin 相当于HBase的管理员
// 创建表 传入表名(TableName.valueOf())
HTableDescriptor tableName = new HTableDescriptor(TableName.valueOf("tableName"));
// 创建列簇 传入列簇名
HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
// 对列簇进行一些配置
cf1.setMaxVersions(5); // 设置版本号
cf1.setTimeToLive(30); // 设置TTL时间
// 将创建好的列簇加入表
tableName.addFamily(cf1);
// 使用admin对象创建表
admin.createTable(tableName);
}
// 删除表
@Test
public void drop_table() throws IOException {
Admin admin = conn.getAdmin();
String tableName = "tableName";
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
}
// 添加一条数据
@Test
public void put() throws IOException {
// 如果想要插入数据 查询数据 需要使用Table对象
// 如果需要对表进行修改,获取表的一些配置、结构 使用HTableDescriptor对象
Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
Put put = new Put("00001".getBytes());
put.addColumn("cf1".getBytes(), "name".getBytes(), "zhangSan".getBytes());
testJavaAPI.put(put);
}
// 获取一条数据
@Test
public void get() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
Get get = new Get("00001".getBytes());
Result rs = testJavaAPI.get(get);
byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value));
}
// 指定rowkey范围 扫描表
@Test
public void scan() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
Scan scan = new Scan();
// 包含startRow 不包含 endRow
scan.withStartRow("001".getBytes());
scan.withStopRow("007".getBytes());
ResultScanner scanner = testJavaAPI.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
byte[] row = rs.getRow();// 获取rowkey
String rk = Bytes.toString(row);
System.out.println();
if ("001".equals(rk)) {
byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value));
} else if ("002".equals(rk)) {
byte[] value = rs.getValue("cf1".getBytes(), "name0".getBytes());
System.out.println(Bytes.toString(value));
byte[] value1 = rs.getValue("cf1".getBytes(), "name1".getBytes());
System.out.println(Bytes.toString(value1));
byte[] value2 = rs.getValue("cf1".getBytes(), "name100".getBytes());
System.out.println(Bytes.toString(value2));
byte[] value3 = rs.getValue("cf1".getBytes(), "name2".getBytes());
System.out.println(Bytes.toString(value3));
byte[] value4 = rs.getValue("cf1".getBytes(), "name3".getBytes());
System.out.println(Bytes.toString(value4));
byte[] value5 = rs.getValue("cf1".getBytes(), "name4".getBytes());
System.out.println(Bytes.toString(value5));
byte[] value6 = rs.getValue("cf1".getBytes(), "name5".getBytes());
System.out.println(Bytes.toString(value6));
} else if ("007".equals(rk)) {
byte[] value6 = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value6));
byte[] value7 = rs.getValue("cf1".getBytes(), "age1".getBytes());
System.out.println(Bytes.toString(value7));
}
rs = scanner.next();
}
}
@Test
public void cellUtil() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
Scan scan = new Scan();
// 包含startRow 不包含 endRow
scan.withStartRow("001".getBytes());
scan.withStopRow("007".getBytes());
ResultScanner scanner = testJavaAPI.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
for (Cell cell : rs.listCells()) {
byte[] rk = CellUtil.cloneRow(cell);
byte[] cf = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
System.out.println("rowkey:" + Bytes.toString(rk) + ",columnsFamily:" + Bytes.toString(cf) + ",qualifier:" + Bytes.toString(qualifier) + ",value:" + Bytes.toString(value));
}
rs = scanner.next();
}
}
// 读取文件并写入HBase
@Test
public void putAll() throws IOException {
Admin admin = conn.getAdmin();
// 判断表是否存在,不存在即创建
if (!admin.tableExists(TableName.valueOf("students"))) {
HTableDescriptor students = new HTableDescriptor(TableName.valueOf("students"));
HColumnDescriptor info = new HColumnDescriptor("info");
students.addFamily(info);
admin.createTable(students);
}
Table students = conn.getTable(TableName.valueOf("students"));
// 读取文件
BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
String line = br.readLine();
while (line != null) {
String[] splits = line.split(",");
String rk = splits[0];
String name = splits[1];
String age = splits[2];
String gender = splits[3];
String clazz = splits[4];
Put put = new Put(Bytes.toBytes(rk));
put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), age.getBytes());
put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.HBase Filter 过滤器之QualifierFilter详解