HBase之DML操作
Posted 健哥说编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase之DML操作相关的知识,希望对你有一定的参考价值。
DML命令
Group name: dml
Commands: append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve
1、append
Append用于将现有数据后面追加数据:
如已经存在的数据的值为:Rose
hbase(main):091:0> t1.get "T001"
COLUMN CELL
info:name timestamp=1529374882240, value=Rose
1 row(s) in 0.0200 seconds
现在将新的数据追加到Jack的后面,就可以使用append:
hbase(main):092:0> append "t1","T001","info:name","Mary"
0 row(s) in 0.0420 seconds
再次查询,已经发现Mary已经追加到了Rose的后面:
hbase(main):093:0> t1.get "T001"
COLUMN CELL
info:name timestamp=1529379406241, value=RoseMary
1 row(s) in 0.0350 seconds
或使用以下方式追加:
hbase(main):005:0> t1.append "T001","info:name","Smith"
0 row(s) in 0.0740 seconds
如果在使用append时rowkey不存在 ,则为创建一个新的rowkey:
hbase(main):007:0> t1.append "T002","info:name","Smith"
0 row(s) in 0.0070 seconds
hbase(main):008:0> t1.scan
ROW COLUMN+CELL
T001 column=info:name, timestamp=1529388939223, value=RoseMarySmith
T002 column=info:name, timestamp=1529388983245, value=Smith
2 row(s) in 0.0200 seconds
上面的T002则为新添加的rowkey。
如果列的标识不存在,也会添加一个新的记录:
hbase(main):009:0> t1.append "T003","info:addr","SDJN"
0 row(s) in 0.0200 seconds
hbase(main):010:0> t1.scan
ROW COLUMN+CELL
T001 column=info:name, timestamp=1529388939223, value=RoseMarySmith
T002 column=info:name, timestamp=1529388983245, value=Smith
T003 column=info:addr, timestamp=1529389131241, value=SDJN
3 row(s) in 0.0430 seconds
使用Java代码追加数据:
Table table = con.getTable(TableName.valueOf("t1"));
Append append = new Append(Bytes.toBytes("T001"));
append = append.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes("Jerry"));
table.append(append);
table.close();
和命令一样,如果rowkey不存在,则为添加一个新的行。
2、count
计数的:
hbase> count 'ns1:t1'
hbase> count 't1'
hbase> count 't1', INTERVAL => 100000
hbase> count 't1', CACHE => 1000
hbase> count 't1', INTERVAL => 10, CACHE => 1000
也可以通过调用MapReduce实现行的统计:
$HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter ‘tablename’
Java代码:
hbase支持count,不过结果就是full table scan,性能表现你懂的=.=
一般的做法是自己维护一个计数器,或者用coprocessor
hbase本身最好不要用来做计算,它真不擅长干这个,把它作为存储系统加好了。
hbase shell 中有 count 命令。
方法1:使用FirstKeyOnlyFitler
// 使用Scan+FirstKeyOnlyFilter统计行数,效率不会太高,
// 这个类,说自己可以高效的统计行数-有点意思
@Test
public void count() throws Exception {
Table table = con.getTable(TableName.valueOf("t1"));
Scan sc = new Scan();
Filter filter = new FirstKeyOnlyFilter();
sc.setFilter(filter);
ResultScanner rs = table.getScanner(sc);
long count = 0;
for (Result r : rs) {
count += 1;
}
System.out.println("行数:" + count);
table.close();
}
方法2:配置全局的统计对象
准备工作:
在hbase-site.xml中添加以下配置,然后重新启动hbase:
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
配置好上面的配置,并重新启动Hbase以后,就可以使用Aggregate进行统计了:
@Test
public void rowCounter() throws Throwable {
AggregationClient aggregationClient = new AggregationClient(config);
Scan scan = new Scan();
Long count = aggregationClient.rowCount(TableName.valueOf("t1"),
new LongColumnInterpreter(),scan);
System.out.println("行数:"+count);
aggregationClient.close();
}
方法3:独立的给某个类添加统计对象
先添加统计类,注意语法,以|开始以||结束:
hbase(main):005:0>alter "t1",{METHOD=>"table_att",'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'}
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 2.0180 seconds
hbase(main):006:0> desc "t1"
Table t1 is ENABLED
t1, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1',
然后再执行以下的Java代码,就可以了:
AggregationClient aggregationClient = new AggregationClient(config);
Scan scan = new Scan();
Long count = aggregationClient.rowCount(TableName.valueOf("t1"),new LongColumnInterpreter(),scan);
System.out.println("行数:"+count);
aggregationClient.close();
注意上面的shell使用 | 分开的目的是可以添加多个,添加多个则hbase会以coprocessor$1这样的方式加以分开:
以下是再添加一个测试:
hbase(main):001:0> alter "t1",{METHOD=>"table_att",'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.BaseWALObserver||'}
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.
0 row(s) in 3.8500 seconds
然后查看这个表的信息,发现(协处理器)coprocessor后面使用$1,$2加以分开:
hbase(main):002:0> desc "t1"
Table t1 is ENABLED
t1, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||', coprocessor$2 => '|org.apache.hadoop.hbase.coprocessor.BaseWALObserver||'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIO
NS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
1 row(s) in 0.0770 seconds
也可以删除这个属性:
先删除一个:
hbase(main):004:0> alter "t1",{METHOD=>"table_att_unset",NAME=>'coprocessor$1'}
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.9610 seconds
再删除一个:
hbase(main):006:0> alter "t1",{METHOD=>"table_att_unset",NAME=>'coprocessor$2'}
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.
0 row(s) in 3.0160 seconds
这样删除以后,就不能再使用统计功能了。
3、delete
删除一个单元格中的数据,参数t1为表名,T001为行键,info:name列族:列标识
hbase(main):017:0> delete "t1","T001","info:name"
0 row(s) in 0.0240 seconds
或是使用以下语法删除:
hbase(main):021:0> t1.delete "T002","info:name"
0 row(s) in 0.0230 seconds
Java代码:
根据rowkey删除某个列:
@Test
public void delete() throws Exception {
Delete delete = new Delete("T001".getBytes());
delete.addColumn("info".getBytes(), "name".getBytes());
Table table = con.getTable(TableName.valueOf("t1"));
try {
table.delete(delete);
System.out.println("删除成功..");
} catch (IOException e) {
System.out.println("删除失败..");
}
table.close();
}
4、deleteall删除
删除这个rowkey表示的所有的记录
Delete all cells in a given row; pass a table name, row, and optionally
a column and timestamp. Examples:
hbase> deleteall 'ns1:t1', 'r1'
hbase> deleteall 't1', 'r1'
hbase> deleteall 't1', 'r1', 'c1'
hbase> deleteall 't1', 'r1', 'c1', ts1
根据某个rowkey,删除这个rowkey的所有列:
同样,还是使用Delete对象,只是不必再传递列名即可:
@Test
public void deleteall() throws Exception {
Delete delete = new Delete("T001".getBytes());
Table table = con.getTable(TableName.valueOf("t1"));
try {
table.delete(delete);
System.out.println("删除成功");
} catch (Exception e) {
System.out.println("删除失败");
}
table.close();
}
5、get_counter
特点注意:
Get_counter只能读取long类型的数据,不能读取bytes类型的数据。但在默认情况下,hbase保存的所有的数据值都是bytes类型的。不过,可以使用incr保存一个long类型的值。
Hbase > incr “t1”,”info:counter” 默认值为1
帮助信息:
hbase(main):042:0> help "get_counter"
Return a counter cell value at specified table/row/column coordinates.
A counter cell should be managed with atomic increment functions on HBase
and the data should be binary encoded (as long value). Example:
hbase> get_counter 'ns1:t1', 'r1', 'c1'
hbase> get_counter 't1', 'r1', 'c1'
The same commands also can be run on a table reference. Suppose you had a reference
t to table 't1', the corresponding command would be:
通过上面的帮助信息,可见,get_counter用于获取long类型的值。这种值可以通过incr来设置。
hbase> t.get_counter 'r1', 'c1'
hbase(main):043:0> get_counter "t1","T004","info:age"
COUNTER VALUE = 2
可以使用counter来记数:
hbase(main):046:0> put "t2","T001","info:name","Jack"
0 row(s) in 0.0640 seconds
设置counter的值:
hbase(main):047:0> incr "t2","C001","counter:c"
COUNTER VALUE = 1
0 row(s) in 0.0290 seconds
遍历t2表中的数据:
hbase(main):048:0> scan "t2"
ROW COLUMN+CELL
C001 column=counter:c, timestamp=1529637812963, value=\x00\x00\x00\x00\x00\x00\x00\x01
T001 column=info:name, timestamp=1529637745536, value=Jack
2 row(s) in 0.0250 seconds
使用Java代码让counter增加1:可以用于统计行数:
Put put =new Put("T003".getBytes());
put.addColumn("info".getBytes(), "name".getBytes(), "Alex".getBytes());
Table table = con.getTable(TableName.valueOf("t2"));
table.put(put);
//增加1,注意使用同一个C001的rowid,同样的columnFamily:qulify
Increment incr = new Increment("C001".getBytes());
incr.addColumn("counter".getBytes(),"c".getBytes(), 1);
table.increment(incr);
table.close();
也可以独立的读取counter的值:
Table table = con.getTable(TableName.valueOf("t2"));
//读取Counter的值,可以将增长值设置为0,如果是增加值可以传值设置为1或是-1(减少)
long counter = table.incrementColumnValue("C001".getBytes(), //rowkey
"counter".getBytes(),//column family
"c".getBytes(), 0);
System.out.println("counter:"+counter);
table.close();
5、get
简的说,就是查询数据的。其中表名和行键是必须要传递的参数。
根据行键查询:
hbase(main):055:0> t1.get "T001"
COLUMN CELL
info:name timestamp=1529391968315, value=Jack
1 row(s) in 0.0340 seconds
只查询某个单元格的值,如果不存在,则返回空:
hbase(main):059:0> t1.get "T001",{COLUMN=>"info:name"}
COLUMN CELL
info:name timestamp=1529391968315, value=Jack
1 row(s) in 0.0320 seconds
hbase(main):060:0> t1.get "T001",{COLUMN=>"info:age"}
COLUMN CELL
info:age timestamp=1529392111605, value=33
1 row(s) in 0.0290 seconds
hbase(main):061:0> t1.get "T001",{COLUMN=>"info:addr"}
COLUMN CELL
0 row(s) in 0.0050 seconds
使用过虑器:
hbase(main):068:0> t1.get "T001",{FILTER=>"ValueFilter(=,'binary:Jack')"}
COLUMN CELL
info:addr timestamp=1529392577775, value=Jack
info:name timestamp=1529391968315, value=Jack
Java代码:
Table table = con.getTable(TableName.valueOf("t1"));
Get get = new Get("T001".getBytes());
Result result = table.get(get);
CellScanner cs = result.cellScanner();
while (cs.advance()) {
Cell cell = cs.current();
System.out.println(Bytes.toString(CellUtil.cloneFamily(cell)) +//
":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "="//
+ Bytes.toString(CellUtil.cloneValue(cell)));
}
table.close();
6、incr增量
用于增加某个long类型数据的值。如果给定的rowkey存在 则会创建新行。
示例:
hbase(main):054:0> incr "t1","0","info:counter"
COUNTER VALUE = 1
//设置步长为10
hbase(main):055:0> incr "t1","0","info:counter",10
COUNTER VALUE = 11
//默认步长为1
hbase(main):056:0> t1.incr "0","info:counter"
COUNTER VALUE = 12
读取这个值:
hbase(main):057:0> get_counter "t1","0","info:counter"
COUNTER VALUE = 12
使用Java代码读取这个值:
Table table = con.getTable(TableName.valueOf("t1"));
long counter = table.incrementColumnValue("0".getBytes(), //指定rowkey
"info".getBytes(),//指定列族
"counter".getBytes(),//指定列的标识
0);//指定步长为0,即只查询不增长
7、scan遍历
过虑:
遍历全部
hbase(main):094:0> t1.scan
ROW COLUMN+CELL
T001 column=info:addr, timestamp=1529392577775, value=Jack
T001 column=info:age, timestamp=1529392111605, value=33
3 row(s) in 0.0310 seconds
Java代码:
Scan scan = new Scan();
//因为这个表中有存在其他的rowkey所以,只过虑某些rowkey开始的
//不能使用正则表达式
Filter filter = new PrefixFilter("T".getBytes());
scan.setFilter(filter);
Table table = con.getTable(TableName.valueOf("t1"));
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
String rowkey = Bytes.toString(r.getRow());
System.out.println(rowkey);
}
table.close();
PrefixFilter
即前缀过虑器。
构造函数:
public PrefixFilter(final byte [] prefix) {
this.prefix = prefix;
}
只能过虑rowkey的值:
hbase(main):095:0> t1.scan FILTER=>"PrefixFilter('T')"
ROW COLUMN+CELL
T001 column=info:addr, timestamp=1529392577775, value=Jack T00 column=info:age, timestamp=1529392111605, value=33
3 row(s) in 0.0340 seconds
或是:
hbase(main):096:0> scan "t1",{FILTER=>"PrefixFilter('T')"}
结果同上。
Java代码:
Scan scan = new Scan();
PrefixFilter prefixFilter = new PrefixFilter("T".getBytes());
scan.setFilter(prefixFilter);
Table table = con.getTable(TableName.valueOf("t1"));
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
String rowkey = Bytes.toString(r.getRow());
System.out.println(rowkey);
CellScanner cs = r.cellScanner();
while (cs.advance()) {
Cell cel = cs.current();
System.out.println(Bytes.toString(CellUtil.cloneFamily(cel))//
+ ":" + Bytes.toString(CellUtil.cloneQualifier(cel)) + //
"=" + Bytes.toString(CellUtil.cloneValue(cel)));
}
}
table.close();
ValueFilter:
值过虑。
构造方法
public ValueFilter(final CompareOp valueCompareOp,
final ByteArrayComparable valueComparator)
通过shell命令
hbase(main):098:0> scan "t1",{FILTER=>"ValueFilter(=,'binary:Jack')"}
ROW COLUMN+CELL
T001 column=info:addr, timestamp=1529392577775, value=Jack
T001 column=info:name, timestamp=1529391968315, value=Jack
或是:
hbase(main):099:0> t1.scan FILTER=>"ValueFilter(=,'binary:Jack')"
//更多过虑器,可以使用substring,regexstring,binary做为查询前缀:
hbase(main):092:0> t1.scan FILTER=>"ValueFilter(=,'substring:Jerry')"
hbase(main):093:0> t1.scan FILTER=>"ValueFilter(=,'regexstring:Jerry')"
hbase(main):094:0> scan "t1",{FILTER=>"ValueFilter(=,'substring:Jerry')"}
hbase(main):095:0> scan "t1",{FILTER=>"ValueFilter(=,'substring:J.*')"}
hbase(main):096:0> scan "t1",{FILTER=>"ValueFilter(=,'regexstring:J.*')"}
Java代码
BinaryComparator二进制比较
ValueFilter valueFilter = //
new ValueFilter(CompareOp.EQUAL, new BinaryComparator("Jerry".getBytes()));
scan.setFilter(valueFilter);
SubstringComparator字符串比较
Scan scan = new Scan();
ValueFilter valueFilter = //
new ValueFilter(CompareOp.EQUAL, new SubstringComparator("Jerry"));
scan.setFilter(valueFilter);
RegexStringComparator正则表达式
Scan scan = new Scan();
ValueFilter valueFilter = //
new ValueFilter(CompareOp.EQUAL, new RegexStringComparator(".*e.*",Pattern.MULTILINE));
scan.setFilter(valueFilter);
更多Comparator不再展示。
AND或OR的使用
hbase(main):100:0> t1.scan FILTER=>"ValueFilter(=,'binary:Jack') AND PrefixFilter('T')"
COLUMNS=>[“info:name”]
用于只显示某些列
hbase(main):121:0> t1.scan COLUMNS=>['info:name',"info:age"或:
hbase(main):127:0> scan "t1",{COLUMNS=>['info:age','info:name']}
还可以再使用过虑器:
hbase(main):129:0> scan "t1",{COLUMNS=>['info:age','info:name'],FILTER=>"PrefixFilter('A')"}
Keyonlyfilter默认只显示key
hbase(main):132:0> scan "t1",{FILTER=>"KeyOnlyFilter()"}
注意上面的结果,都没有value的值。
ColumnCountGetFilter
显示到某个列的个数为设置为个数是停止查询后面的数据。这个比较怪异。不知识会有什么用。
如:
hbase(main):159:0> t1.scan
上面的结果中,T002为5行,执行以下查询:
hbase(main):161:0> scan "t1",{FILTER=>"ColumnCountGetFilter(4)"}
ROW COLUMN+CELL
T002 column=info:name, timestamp=1529391977614, value=Rose
2 row(s) in 0.0090 seconds
由于T001为4行所以全部显示,由于T002为5行,所以只显示前4行,然后后面的数据将不再显示。
以下是官方API:
A filter that will only return the first KV from each row.
This filter can be used to more efficiently perform row count operations.可以用于统计行数。
现在假设存在以下数据,注意T001的rokey为两个列信息:
hbase(main):039:0> t1.scan
ROW COLUMN+CELL
2 row(s) in 0.0320 seconds
而FirtKeyOnlyFilter就是返回每一行的第一个列的值,通过以下返回,可见T001只返回了info:age:
hbase(main):040:0> t1.scan FILTER=>"FirstKeyOnlyFilter()"
ROW COLUMN+CELL
FirstKeyOnlyFilter的使用测试:
//使用Scan+FirstKeyOnlyFilter统计行数,效率不会太高,
//这个类,说自己可以高效的统计行数-有点意思
@Test
public void count() throws Exception{
Table table = con.getTable(TableName.valueOf("t1"));
Scan sc = new Scan();
Filter filter = new FirstKeyOnlyFilter();
sc.setFilter(filter);
ResultScanner rs = table.getScanner(sc);
long count = 0;
for(Result r:rs) {
count+=1;
}
System.out.println("行数:"+count);
table.close();
}
RowFilter
RowFilter构造方法:
public RowFilter(final CompareOp rowCompareOp,
final ByteArrayComparable rowComparator)
第一个参数为比较的规则,是一个枚举类型:
第二个参数为具体值,ByteArrayComparable的子类有:
BinaryComparator字节码比较
//使用RowFitler加BinaryComparator查询rowkey为T001的
RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //
new BinaryComparator("T001".getBytes()));
Scan sc = new Scan();
sc.setFilter(rowFilter);
Table table = con.getTable(TableName.valueOf("t1"));
ResultScanner rs = table.getScanner(sc);
for (Result r : rs) {
String rowkey = Bytes.toString(r.getRow());
System.out.println(rowkey);
CellScanner cs = r.cellScanner();
while (cs.advance()) {
Cell cel = cs.current();
System.out.println(Bytes.toString(CellUtil.cloneFamily(cel))//
+ ":" + Bytes.toString(CellUtil.cloneQualifier(cel)) + //
"=" + Bytes.toString(CellUtil.cloneValue(cel)));
}
}
table.close();
在命令行中执行如下,以binary为前缀,即可以表达为二进制类型:
hbase(main):083:0> scan "t1",{FILTER=>"RowFilter(=,'binary:T001')"}
ROW COLUMN+CELL
1 row(s) in 0.0180 seconds
BinaryPrefixComparator字节码前缀
以下查询前缀以T开始的rowkey:
RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //
new BinaryPrefixComparator("T".getBytes()));
在命令行中的查询,以binaryprefix做为前缀:
hbase(main):085:0> scan "t1",{FILTER=>"RowFilter(=,'binaryprefix:T')"}
ROW COLUMN+CELL
T001 column=info:name, timestamp=1529635711054, value=Jerry
SubstringComparator匹配字符串
RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //
new SubstringComparator("T001"));
在shell中执行使用substring做为前缀:
hbase(main):086:0> scan "t1",{FILTER=>"RowFilter(=,'substring:T001')"}
ROW COLUMN+CELL
1 row(s) in 0.0300 seconds
RegexStringComparator正则表达式
以下使用正则表达式,也同样查询以T开始的rowkey的值:
RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, //
new RegexStringComparator("T.*"));
在Shell中使用正则表达式:
hbase(main):087:0> scan "t1",{FILTER=>"RowFilter(=,'regexstring:T.*')"}
ROW COLUMN+CELL
PageFilter
返回前N行的数据
hbase(main):179:0> t1.scan FILTER=>"PageFilter(1)"
ROW COLUMN+CELL
也可以和Startrow与stoprow共同使用:
hbase(main):186:0> scan "t1",{FILTER=>"PageFilter(2)",STARTROW=>"T002",STOPROW=>"T003"}
1 row(s) in 0.0080 seconds
ValueFilter
值过虑:
hbase(main):164:0> t1.scan FILTER=>"ValueFilter(=,'binary:Jack')"
ColumnPaginationFilter
Limit查询范围
LIMIT=>查询行数限制,STARTROW=>从哪一个rowkey开始查询。
hbase(main):169:0> scan "t1",{LIMIT=>1,STARTROW=>'T002'
1 row(s) in 0.0150 seconds
可用于分页查询。
以上是关于HBase之DML操作的主要内容,如果未能解决你的问题,请参考以下文章