HBase之DML操作

Posted 健哥说编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase之DML操作相关的知识,希望对你有一定的参考价值。

DML命令

Group name: dml

  Commands: append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve

 


1append

 

Append用于将现有数据后面追加数据:

如已经存在的数据的值为:Rose

hbase(main):091:0> t1.get "T001"

COLUMN                           CELL     

 info:name      timestamp=1529374882240, value=Rose     

1 row(s) in 0.0200 seconds

现在将新的数据追加到Jack的后面,就可以使用append

hbase(main):092:0> append "t1","T001","info:name","Mary"

0 row(s) in 0.0420 seconds

再次查询,已经发现Mary已经追加到了Rose的后面:

hbase(main):093:0> t1.get "T001"

COLUMN       CELL         

 info:name      timestamp=1529379406241, value=RoseMary            

1 row(s) in 0.0350 seconds

 

或使用以下方式追加:

hbase(main):005:0> t1.append "T001","info:name","Smith"

0 row(s) in 0.0740 seconds

 

如果在使用appendrowkey不存在 ,则为创建一个新的rowkey

hbase(main):007:0> t1.append "T002","info:name","Smith"

0 row(s) in 0.0070 seconds

hbase(main):008:0> t1.scan

ROW   COLUMN+CELL                                       

 T001          column=info:name, timestamp=1529388939223, value=RoseMarySmith                      

 T002    column=info:name, timestamp=1529388983245, value=Smith        

2 row(s) in 0.0200 seconds

上面的T002则为新添加的rowkey

 

如果列的标识不存在,也会添加一个新的记录:

hbase(main):009:0> t1.append "T003","info:addr","SDJN"

0 row(s) in 0.0200 seconds

hbase(main):010:0> t1.scan

ROW    COLUMN+CELL                           

 T001         column=info:name, timestamp=1529388939223, value=RoseMarySmith    

 T002    column=info:name, timestamp=1529388983245, value=Smith          

 T003    column=info:addr, timestamp=1529389131241, value=SDJN                                    

3 row(s) in 0.0430 seconds

 

使用Java代码追加数据:

Table table = con.getTable(TableName.valueOf("t1"));

Append append = new Append(Bytes.toBytes("T001"));

append = append.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes("Jerry"));

table.append(append);

table.close();

和命令一样,如果rowkey不存在,则为添加一个新的行。

 

2count

计数的:

hbase> count 'ns1:t1'

 hbase> count 't1'

 hbase> count 't1', INTERVAL => 100000

 hbase> count 't1', CACHE => 1000

 hbase> count 't1', INTERVAL => 10, CACHE => 1000

 

也可以通过调用MapReduce实现行的统计:

$HBASE_HOME/bin/hbase   org.apache.hadoop.hbase.mapreduce.RowCounter ‘tablename

 

Java代码:

hbase支持count,不过结果就是full table scan,性能表现你懂的=.=

一般的做法是自己维护一个计数器,或者用coprocessor

hbase本身最好不要用来做计算,它真不擅长干这个,把它作为存储系统加好了。

hbase shell 中有 count 命令。

方法1:使用FirstKeyOnlyFitler

// 使用Scan+FirstKeyOnlyFilter统计行数,效率不会太高,

// 这个类,说自己可以高效的统计行数-有点意思

@Test

public void count() throws Exception {

Table table = con.getTable(TableName.valueOf("t1"));

Scan sc = new Scan();

Filter filter = new FirstKeyOnlyFilter();

sc.setFilter(filter);

ResultScanner rs = table.getScanner(sc);

long count = 0;

for (Result r : rs) {

count += 1;

}

System.out.println("行数:" + count);

table.close();

}

方法2:配置全局的统计对象

 

准备工作:

hbase-site.xml中添加以下配置,然后重新启动hbase

<property>    

<name>hbase.coprocessor.user.region.classes</name>  

<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>    

</property>  

配置好上面的配置,并重新启动Hbase以后,就可以使用Aggregate进行统计了:

@Test

public void rowCounter() throws Throwable {

AggregationClient aggregationClient = new AggregationClient(config);

Scan scan = new Scan();

Long count = aggregationClient.rowCount(TableName.valueOf("t1"),

new LongColumnInterpreter(),scan);

System.out.println("行数:"+count);

aggregationClient.close();

}

方法3:独立的给某个类添加统计对象

先添加统计类,注意语法,以|开始以||结束:

hbase(main):005:0>alter "t1",{METHOD=>"table_att",'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'}

Updating all regions with the new schema...

1/1 regions updated.

Done.

0 row(s) in 2.0180 seconds

 

hbase(main):006:0> desc "t1"

Table t1 is ENABLED  

t1, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'}                                        

COLUMN FAMILIES DESCRIPTION                                    

{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1',

然后再执行以下的Java代码,就可以了:

AggregationClient aggregationClient = new AggregationClient(config);

Scan scan = new Scan();

Long count = aggregationClient.rowCount(TableName.valueOf("t1"),new LongColumnInterpreter(),scan);

System.out.println("行数:"+count);

aggregationClient.close();

 

注意上面的shell使用 | 分开的目的是可以添加多个,添加多个则hbase会以coprocessor$1这样的方式加以分开:

以下是再添加一个测试:

hbase(main):001:0> alter "t1",{METHOD=>"table_att",'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.BaseWALObserver||'}

Updating all regions with the new schema...

0/1 regions updated.

1/1 regions updated.

Done.

0 row(s) in 3.8500 seconds

然后查看这个表的信息,发现(协处理器)coprocessor后面使用$1,$2加以分开:

hbase(main):002:0> desc "t1"

Table t1 is ENABLED  

t1, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||', coprocessor$2 => '|org.apache.hadoop.hbase.coprocessor.BaseWALObserver||'}        

COLUMN FAMILIES DESCRIPTION                                    

{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIO

NS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                      

1 row(s) in 0.0770 seconds

 

 也可以删除这个属性:

先删除一个:

hbase(main):004:0> alter "t1",{METHOD=>"table_att_unset",NAME=>'coprocessor$1'}

Updating all regions with the new schema...

1/1 regions updated.

Done.

0 row(s) in 1.9610 seconds

再删除一个:

hbase(main):006:0> alter "t1",{METHOD=>"table_att_unset",NAME=>'coprocessor$2'}

Updating all regions with the new schema...

0/1 regions updated.

1/1 regions updated.

Done.

0 row(s) in 3.0160 seconds

 

这样删除以后,就不能再使用统计功能了。

 

 

3delete

删除一个单元格中的数据,参数t1为表名,T001为行键,info:name列族:列标识

hbase(main):017:0> delete "t1","T001","info:name"

0 row(s) in 0.0240 seconds

或是使用以下语法删除:

hbase(main):021:0> t1.delete "T002","info:name"

0 row(s) in 0.0230 seconds

Java代码:

根据rowkey删除某个列:

@Test

public void delete() throws Exception {

Delete delete = new Delete("T001".getBytes());

delete.addColumn("info".getBytes(), "name".getBytes());

Table table = con.getTable(TableName.valueOf("t1"));

try {

table.delete(delete);

System.out.println("删除成功..");

} catch (IOException e) {

System.out.println("删除失败..");

}

table.close();

}


4deleteall删除

删除这个rowkey表示的所有的记录

Delete all cells in a given row; pass a table name, row, and optionally

a column and timestamp. Examples:

  hbase> deleteall 'ns1:t1', 'r1'

  hbase> deleteall 't1', 'r1'

  hbase> deleteall 't1', 'r1', 'c1'

  hbase> deleteall 't1', 'r1', 'c1', ts1

 

根据某个rowkey,删除这个rowkey的所有列:

同样,还是使用Delete对象,只是不必再传递列名即可:

@Test

public void deleteall() throws Exception {

Delete delete = new Delete("T001".getBytes());

Table table = con.getTable(TableName.valueOf("t1"));

try {

table.delete(delete);

System.out.println("删除成功");

} catch (Exception e) {

System.out.println("删除失败");

}

table.close();

}

 

5、get_counter

特点注意:

Get_counter只能读取long类型的数据,不能读取bytes类型的数据。但在默认情况下,hbase保存的所有的数据值都是bytes类型的。不过,可以使用incr保存一个long类型的值。

Hbase > incr t1,info:counter 默认值为1

帮助信息:

hbase(main):042:0> help "get_counter"

Return a counter cell value at specified table/row/column coordinates.

A counter cell should be managed with atomic increment functions on HBase

and the data should be binary encoded (as long value). Example:

 

  hbase> get_counter 'ns1:t1', 'r1', 'c1'

  hbase> get_counter 't1', 'r1', 'c1'

 

The same commands also can be run on a table reference. Suppose you had a reference

t to table 't1', the corresponding command would be:

通过上面的帮助信息,可见,get_counter用于获取long类型的值。这种值可以通过incr来设置。

 

  hbase> t.get_counter 'r1', 'c1'

hbase(main):043:0> get_counter "t1","T004","info:age"

COUNTER VALUE = 2

 

可以使用counter来记数:

hbase(main):046:0> put "t2","T001","info:name","Jack"

0 row(s) in 0.0640 seconds

设置counter的值:

hbase(main):047:0> incr "t2","C001","counter:c"

COUNTER VALUE = 1

0 row(s) in 0.0290 seconds

遍历t2表中的数据:

hbase(main):048:0> scan "t2"

ROW           COLUMN+CELL    

 C001   column=counter:c, timestamp=1529637812963, value=\x00\x00\x00\x00\x00\x00\x00\x01               

 T001   column=info:name, timestamp=1529637745536, value=Jack   

2 row(s) in 0.0250 seconds

 

使用Java代码让counter增加1:可以用于统计行数:

Put put =new Put("T003".getBytes());

put.addColumn("info".getBytes(), "name".getBytes(), "Alex".getBytes());

Table table = con.getTable(TableName.valueOf("t2"));

table.put(put);

//增加1,注意使用同一个C001的rowid,同样的columnFamily:qulify

Increment incr = new Increment("C001".getBytes());

incr.addColumn("counter".getBytes(),"c".getBytes(), 1);

table.increment(incr);

table.close();

 

也可以独立的读取counter的值:

Table table = con.getTable(TableName.valueOf("t2"));

//读取Counter的值,可以将增长值设置为0,如果是增加值可以传值设置为1或是-1(减少)

long counter = table.incrementColumnValue("C001".getBytes(), //rowkey

"counter".getBytes(),//column family

 "c".getBytes(), 0);

System.out.println("counter:"+counter);

table.close();


5get

简的说,就是查询数据的。其中表名和行键是必须要传递的参数。

根据行键查询:

hbase(main):055:0> t1.get "T001"

COLUMN      CELL     

 info:name      timestamp=1529391968315, value=Jack               

1 row(s) in 0.0340 seconds

只查询某个单元格的值,如果不存在,则返回空:

hbase(main):059:0> t1.get "T001",{COLUMN=>"info:name"}

COLUMN      CELL     

 info:name     timestamp=1529391968315, value=Jack               

1 row(s) in 0.0320 seconds

hbase(main):060:0> t1.get "T001",{COLUMN=>"info:age"}

COLUMN   CELL     

 info:age  timestamp=1529392111605, value=33                 

1 row(s) in 0.0290 seconds

hbase(main):061:0> t1.get "T001",{COLUMN=>"info:addr"}

COLUMN    CELL     

0 row(s) in 0.0050 seconds

使用过虑器:

hbase(main):068:0> t1.get "T001",{FILTER=>"ValueFilter(=,'binary:Jack')"}

COLUMN      CELL     

 info:addr    timestamp=1529392577775, value=Jack               

 info:name   timestamp=1529391968315, value=Jack

 

Java代码:

Table table = con.getTable(TableName.valueOf("t1"));

Get get = new Get("T001".getBytes());

Result result = table.get(get);

CellScanner cs = result.cellScanner();

while (cs.advance()) {

Cell cell = cs.current();

System.out.println(Bytes.toString(CellUtil.cloneFamily(cell)) +//

":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "="//

+ Bytes.toString(CellUtil.cloneValue(cell)));

}

table.close();

6、incr增量

用于增加某个long类型数据的值。如果给定的rowkey存在 则会创建新行。

示例:

hbase(main):054:0> incr "t1","0","info:counter"

COUNTER VALUE = 1

//设置步长为10

hbase(main):055:0> incr "t1","0","info:counter",10

COUNTER VALUE = 11

//默认步长为1

hbase(main):056:0> t1.incr "0","info:counter"

COUNTER VALUE = 12

 

读取这个值:

hbase(main):057:0> get_counter "t1","0","info:counter"

COUNTER VALUE = 12

 

使用Java代码读取这个值:

Table table = con.getTable(TableName.valueOf("t1"));

long counter = table.incrementColumnValue("0".getBytes(), //指定rowkey

"info".getBytes(),//指定列族

"counter".getBytes(),//指定列的标识

0);//指定步长为0,即只查询不增长


7、scan遍历

过虑:

 

遍历全部

hbase(main):094:0> t1.scan

ROW    COLUMN+CELL                                       

 T001        column=info:addr, timestamp=1529392577775, value=Jack       

 T001   column=info:age, timestamp=1529392111605, value=33                      

3 row(s) in 0.0310 seconds

Java代码:

Scan scan = new Scan();

//因为这个表中有存在其他的rowkey所以,只过虑某些rowkey开始的

//不能使用正则表达式

Filter filter = new PrefixFilter("T".getBytes());

scan.setFilter(filter);

Table table = con.getTable(TableName.valueOf("t1"));

ResultScanner rs = table.getScanner(scan);

for (Result r : rs) {

String rowkey = Bytes.toString(r.getRow());

System.out.println(rowkey);

}

table.close();

 

PrefixFilter

即前缀过虑器。

构造函数:

public PrefixFilter(final byte [] prefix) {

    this.prefix = prefix;

}

 

只能过虑rowkey的值:

hbase(main):095:0> t1.scan FILTER=>"PrefixFilter('T')"

ROW     COLUMN+CELL                                       

 T001        column=info:addr, timestamp=1529392577775, value=Jack       T00      column=info:age, timestamp=1529392111605, value=33                                                         

3 row(s) in 0.0340 seconds

或是:

hbase(main):096:0> scan "t1",{FILTER=>"PrefixFilter('T')"}

结果同上。

Java代码:

Scan scan = new Scan();

PrefixFilter prefixFilter = new PrefixFilter("T".getBytes());

scan.setFilter(prefixFilter);

Table table = con.getTable(TableName.valueOf("t1"));

ResultScanner rs = table.getScanner(scan);

for (Result r : rs) {

String rowkey = Bytes.toString(r.getRow());

System.out.println(rowkey);

CellScanner cs = r.cellScanner();

while (cs.advance()) {

Cell cel = cs.current();

System.out.println(Bytes.toString(CellUtil.cloneFamily(cel))//

+ ":" + Bytes.toString(CellUtil.cloneQualifier(cel)) + //

"=" + Bytes.toString(CellUtil.cloneValue(cel)));

}

}

table.close();

 

 

ValueFilter

值过虑。

构造方法

public ValueFilter(final CompareOp valueCompareOp,

      final ByteArrayComparable valueComparator)

通过shell命令

hbase(main):098:0> scan "t1",{FILTER=>"ValueFilter(=,'binary:Jack')"}

ROW                              COLUMN+CELL                                                                                

 T001                            column=info:addr, timestamp=1529392577775, value=Jack                                      

 T001                            column=info:name, timestamp=1529391968315, value=Jack

或是:

hbase(main):099:0> t1.scan FILTER=>"ValueFilter(=,'binary:Jack')"

//更多过虑器,可以使用substringregexstringbinary做为查询前缀:

hbase(main):092:0> t1.scan FILTER=>"ValueFilter(=,'substring:Jerry')"

hbase(main):093:0> t1.scan FILTER=>"ValueFilter(=,'regexstring:Jerry')"

hbase(main):094:0> scan "t1",{FILTER=>"ValueFilter(=,'substring:Jerry')"}

hbase(main):095:0> scan "t1",{FILTER=>"ValueFilter(=,'substring:J.*')"}

hbase(main):096:0> scan "t1",{FILTER=>"ValueFilter(=,'regexstring:J.*')"}

 

Java代码
BinaryComparator二进制比较

ValueFilter valueFilter = //

new ValueFilter(CompareOp.EQUAL, new BinaryComparator("Jerry".getBytes()));

scan.setFilter(valueFilter);

 

SubstringComparator字符串比较

Scan scan = new Scan();

ValueFilter valueFilter = //

new ValueFilter(CompareOp.EQUAL, new SubstringComparator("Jerry"));

scan.setFilter(valueFilter);

 

RegexStringComparator正则表达式

Scan scan = new Scan();

ValueFilter valueFilter = //

new ValueFilter(CompareOp.EQUAL, new RegexStringComparator(".*e.*",Pattern.MULTILINE));

scan.setFilter(valueFilter);

 

更多Comparator不再展示。

 

 

 

 

ANDOR的使用

hbase(main):100:0> t1.scan FILTER=>"ValueFilter(=,'binary:Jack') AND PrefixFilter('T')"

 

COLUMNS=>[info:name]

用于只显示某些列

hbase(main):121:0> t1.scan COLUMNS=>['info:name',"info:age"或:

hbase(main):127:0> scan "t1",{COLUMNS=>['info:age','info:name']}

 

还可以再使用过虑器:

hbase(main):129:0> scan "t1",{COLUMNS=>['info:age','info:name'],FILTER=>"PrefixFilter('A')"}

 

Keyonlyfilter默认只显示key

hbase(main):132:0> scan "t1",{FILTER=>"KeyOnlyFilter()"} 

注意上面的结果,都没有value的值。

 

ColumnCountGetFilter

显示到某个列的个数为设置为个数是停止查询后面的数据。这个比较怪异。不知识会有什么用。

如:

hbase(main):159:0> t1.scan

 

上面的结果中,T0025行,执行以下查询:

hbase(main):161:0> scan "t1",{FILTER=>"ColumnCountGetFilter(4)"}

ROW                              COLUMN+CELL                                  

 T002                            column=info:name, timestamp=1529391977614, value=Rose                                      

2 row(s) in 0.0090 seconds

 

由于T0014行所以全部显示,由于T0025行,所以只显示前4行,然后后面的数据将不再显示。

          

以下是官方API

A filter that will only return the first KV from each row.

This filter can be used to more efficiently perform row count operations.可以用于统计行数。

 

现在假设存在以下数据,注意T001rokey为两个列信息:

hbase(main):039:0> t1.scan

ROW COLUMN+CELL  

2 row(s) in 0.0320 seconds

FirtKeyOnlyFilter就是返回每一行的第一个列的值,通过以下返回,可见T001只返回了info:age

hbase(main):040:0> t1.scan FILTER=>"FirstKeyOnlyFilter()"

ROW  COLUMN+CELL                                           

FirstKeyOnlyFilter的使用测试:

//使用Scan+FirstKeyOnlyFilter统计行数,效率不会太高,

//这个类,说自己可以高效的统计行数-有点意思

@Test

public void count() throws Exception{

Table table = con.getTable(TableName.valueOf("t1"));

Scan sc = new Scan();

Filter filter = new FirstKeyOnlyFilter();

sc.setFilter(filter);

ResultScanner rs = table.getScanner(sc);

long count = 0;

for(Result r:rs) {

count+=1;

}

System.out.println("行数:"+count);

table.close();

}

                                                                                           

RowFilter      

RowFilter构造方法:

 public RowFilter(final CompareOp rowCompareOp,

final ByteArrayComparable rowComparator)      

 

 第一个参数为比较的规则,是一个枚举类型:

 

第二个参数为具体值,ByteArrayComparable的子类有:

 

 

BinaryComparator字节码比较

//使用RowFitler加BinaryComparator查询rowkeyT001的

RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //

new BinaryComparator("T001".getBytes()));

Scan sc = new Scan();

sc.setFilter(rowFilter);

Table table = con.getTable(TableName.valueOf("t1"));

ResultScanner rs = table.getScanner(sc);

for (Result r : rs) {

String rowkey = Bytes.toString(r.getRow());

System.out.println(rowkey);

CellScanner cs = r.cellScanner();

while (cs.advance()) {

Cell cel = cs.current();

System.out.println(Bytes.toString(CellUtil.cloneFamily(cel))//

+ ":" + Bytes.toString(CellUtil.cloneQualifier(cel)) + //

"=" + Bytes.toString(CellUtil.cloneValue(cel)));

}

}

table.close();

 

在命令行中执行如下,以binary为前缀,即可以表达为二进制类型:

hbase(main):083:0> scan "t1",{FILTER=>"RowFilter(=,'binary:T001')"}

ROW  COLUMN+CELL                                                  

1 row(s) in 0.0180 seconds

 

BinaryPrefixComparator字节码前缀

以下查询前缀以T开始的rowkey

RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //

new BinaryPrefixComparator("T".getBytes()));

 

在命令行中的查询,以binaryprefix做为前缀:

hbase(main):085:0> scan "t1",{FILTER=>"RowFilter(=,'binaryprefix:T')"}

ROW  COLUMN+CELL                   

 T001   column=info:name, timestamp=1529635711054, value=Jerry     

 

 

SubstringComparator匹配字符串

RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //

new SubstringComparator("T001"));

 

shell中执行使用substring做为前缀:

hbase(main):086:0> scan "t1",{FILTER=>"RowFilter(=,'substring:T001')"}

ROW COLUMN+CELL                                             

1 row(s) in 0.0300 seconds

 

RegexStringComparator正则表达式

以下使用正则表达式,也同样查询以T开始的rowkey的值:

RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //

new RegexStringComparator("T.*"));

 

Shell中使用正则表达式:

hbase(main):087:0> scan "t1",{FILTER=>"RowFilter(=,'regexstring:T.*')"}

ROW  COLUMN+CELL    

             

PageFilter  

返回前N行的数据

hbase(main):179:0> t1.scan FILTER=>"PageFilter(1)"

ROW                              COLUMN+CELL      

也可以和Startrowstoprow共同使用:

hbase(main):186:0> scan "t1",{FILTER=>"PageFilter(2)",STARTROW=>"T002",STOPROW=>"T003"}

       

1 row(s) in 0.0080 seconds

                                                                   

ValueFilter

值过虑:

hbase(main):164:0> t1.scan FILTER=>"ValueFilter(=,'binary:Jack')"

                                          

ColumnPaginationFilter

Limit查询范围

LIMIT=>查询行数限制,STARTROW=>从哪一个rowkey开始查询。

hbase(main):169:0> scan "t1",{LIMIT=>1,STARTROW=>'T002'                                    

1 row(s) in 0.0150 seconds

可用于分页查询。

 


以上是关于HBase之DML操作的主要内容,如果未能解决你的问题,请参考以下文章

HBase的java客户端测试---DML操作

Hbase总结

HBASE基础使用Java API实现DDL与DML

HBase shell操作

hbase的基本操作

大数据ClickHouse(十六):ClickHouse SQL语法之DML 操作