hbase多个scan能一起查吗
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hbase多个scan能一起查吗相关的知识,希望对你有一定的参考价值。
可以一起查。1.scan原理
HBase的查询实现只提供两种方式:
1、按指定RowKey 获取唯一一条记录,get方法(org.apache.hadoop.hbase.client.Get)
Get 的方法处理分两种 : 设置了ClosestRowBefore 和没有设置的rowlock .主要是用来保证行的事务性,即每个get 是以一个row 来标记的.一个row中可以有很多family 和column.
2、按指定的条件获取一批记录,scan方法(org.apache.Hadoop.hbase.client.Scan)实现条件查询功能使用的就是scan 方式.
1)scan 可以通过setCaching 与setBatch 方法提高速度(以空间换时间);
2)scan 可以通过setStartRow 与setEndRow 来限定范围([start,end)start 是闭区间,
end 是开区间)。范围越小,性能越高。
3)、scan 可以通过setFilter 方法添加过滤器,这也是分页、多条件查询的基础。
HBase中scan并不像大家想象的一样直接发送一个命令过去,服务器就将满足扫描条件的所有数据一次性返回给客户端。而实际上它的工作原理如下图所示:
上图右侧是HBase scan的客户端代码,其中for循环中每次遍历ResultScanner对象获取一行记录,实际上在客户端层面都会调用一次next请求。next请求整个流程可以分为如下几个步骤:
next请求首先会检查客户端缓存中是否存在还没有读取的数据行,如果有就直接返回,否则需要将next请求给HBase服务器端(RegionServer)。
如果客户端缓存已经没有扫描结果,就会将next请求发送给HBase服务器端。默认情况下,一次next请求仅可以请求100行数据(或者返回结果集总大小不超过2M)
服务器端接收到next请求之后就开始从BlockCache、HFile以及memcache中一行一行进行扫描,扫描的行数达到100行之后就返回给客户端,客户端将这100条数据缓存到内存并返回一条给上层业务。
HBase 每次 scan 的数据量可能会比较大,客户端不会一次性全部把数据从服务端拉回来。而是通过多次 rpc 分批次的拉取。类似于 TCP 协议里面一段一段的传输,可以做到细粒度的流量控制。至于如何调优,控制每次 rpc 拉取的数据量,就可以通过三个参数来控制。
.setCaching => .setNumberOfRowsFetchSize (客户端每次 rpc fetch 的行数)
.setBatch => .setColumnsChunkSize (客户端每次获取的列数)
.setMaxResultSize => .setMaxResultByteSize (客户端缓存的最大字节数)
hbase.client.scanner.caching - (setCaching):HBase-0.98 默认值为为 100,HBase-1.2 默认值为 2147483647,即 Integer.MAX_VALUE。Scan.next() 的一次 RPC 请求 fetch 的记录条数。配置建议:这个参数与下面的setMaxResultSize配合使用,在网络状况良好的情况下,自定义设置不宜太小, 可以直接采用默认值,不配置。
setBatch() 配置获取的列数,假如表有两个列簇 cf,info,每个列簇5个列。这样每行可能有10列了,setBatch() 可以控制每次获取的最大列数,进一步从列级别控制流量。配置建议:当列数很多,数据量大时考虑配置此参数,例如100列每次只获取50列。一般情况可以默认值(-1 不受限)。
hbase.client.scanner.max.result.size - (setMaxResultSize):HBase-0.98 无该项配置,HBase-1.2 默认值为 210241024,即 2M。Scan.next() 的一次 RPC 请求 fetch 的数据量大小,目前 HBase-1.2 在 Caching 为默认值(Integer Max)的时候,实际使用这个参数控制 RPC 次数和流量。配置建议:如果网络状况较好(万兆网卡),scan 的数据量非常大,可以将这个值配置高一点。如果配置过高:则可能 loadCache 速度比较慢,导致 scan timeout 异常
hbase.server.scanner.max.result.size:服务端配置。HBase-0.98 无该项配置,HBase-1.2 新增,默认值为 10010241024,即 100M。该参数表示当 Scan.next() 发起 RPC 后,服务端返回给客户端的最大字节数,防止 Server OOM。
要计算一次扫描操作的RPC请求的次数,用户需要先计算出行数和每行列数的乘积。然后用这个值除以批量大小和每行列数中较小的那个值。最后再用除得的结果除以扫描器缓存值。 用数学公式表示如下:
RPC 返回的个数 = (row数 * 每行的列数)/ Min(每行列数,Batch大小) / Caching大小
Result 返回的个数 =( row数 * 每行的列数 )/ Min(每行列数,Batch大小)
复制
2.Hbase Shell中使用
在hbase shell中查询数据,可以在hbase shell中直接使用过滤器:
# hbase shell > scan 'tablename',STARTROW=>'start',COLUMNS=>['family:qualifier'],FILTER=>"ValueFilter(=,'substring:88')"
复制
如上命令所示,查询的是表名为testByCrq,过滤方式是通过value过滤,匹配出value含111的数据。
因在hbase shell中一些操作比较麻烦(比如删除字符需先按住ctrl在点击退格键),且退出后,查询的历史纪录不可考,故如下方式是比较方便的一种:
# echo "scan 'testByCrq', FILTER=>\"ValueFilter(=,'substring:111')\"" | hbase shell
复制
如上命令,可在bash中直接使用,表名是testByCrq,过滤方式是通过value过滤,匹配出value含111的数据,中间的"需要用\转义。
建表
create 'test1', 'lf', 'sf'
-- lf: column family of LONG values (binary value)
-- sf: column family of STRING values
复制
导入数据
put 'test1', 'user1|ts1', 'sf:c1', 'sku1'
put 'test1', 'user1|ts2', 'sf:c1', 'sku188'
put 'test1', 'user1|ts3', 'sf:s1', 'sku123'
put 'test1', 'user2|ts4', 'sf:c1', 'sku2'
put 'test1', 'user2|ts5', 'sf:c2', 'sku288'
put 'test1', 'user2|ts6', 'sf:s1', 'sku222'
put 'test1', 'user3|ts7', 'lf:c1', 12345
put 'test1', 'user3|ts8', 'lf:c1', 67890
复制
1.限制条件
scan 'hbase:meta'
scan 'hbase:meta', COLUMNS => 'info:regioninfo'
scan 'ns1:t1', COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'
scan 't1', COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'
scan 't1', COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]
scan 't1', REVERSED => true
复制
2.Filter过滤
1.rowkey查询
rowkey为user1开头的
scan 'test1', FILTER => "PrefixFilter ('user1')"
ROW COLUMN+CELL
user1|ts1 column=sf:c1, timestamp=1409122354868, value=sku1
user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188
user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123
复制
FirstKeyOnlyFilter: 一个rowkey可以有多个version,同一个rowkey的同一个column也会有多个的值, 只拿出key中的第一个column的第一个version KeyOnlyFilter: 只要key,不要value
scan 'test1', FILTER=>"FirstKeyOnlyFilter() AND ValueFilter(=,'binary:sku188') AND KeyOnlyFilter()"
ROW COLUMN+CELL
user1|ts2 column=sf:c1, timestamp=1409122354918, value=
复制
查询rowkey里面包含ts3的
scan 'test1', FILTER=>"RowFilter(=,'substring:ts3')"
ROW COLUMN+CELL
user1|ts3 column=sf:s1, timestamp=1554865926412, value=sku123
复制
从user1|ts2开始,找到所有的rowkey以user1开头的
scan 'test1', STARTROW=>'user1|ts2', FILTER => "PrefixFilter ('user1')"
ROW COLUMN+CELL
user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188
user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123
复制
从user1|ts2开始,找到所有的到rowkey以user2开头
scan 'test1', STARTROW=>'user1|ts2', STOPROW=>'user2'
ROW COLUMN+CELL
user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188 user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123
复制
2.值查询
谁的值=sku188
scan 'test1', FILTER=>"ValueFilter(=,'binary:sku188')"
ROW COLUMN+CELL
user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188
复制
谁的值包含88
scan 'test1', FILTER=>"ValueFilter(=,'substring:88')"
ROW COLUMN+CELL
user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188
user2|ts5 column=sf:c2, timestamp=1409122355030, value=sku288
复制
值小于等于20000
scan 'test1', FILTER=>"ValueFilter(<=,'binary:20000')"
ROW COLUMN+CELL
user3|ts7 column=lf:c1, timestamp=1554866187587, value=12345
复制
注意:如果查询值大于20000,会查出所有值,因为“sku188”等值转为二进制后都大于20000。
substring不能使用小于等于等符号。
3.列查询
column为c2,值包含88的用户
scan 'test1', FILTER=>"ColumnPrefixFilter('c2') AND ValueFilter(=,'substring:88')"
ROW COLUMN+CELL
user2|ts5 column=sf:c2, timestamp=1409122355030, value=sku288
复制
通过搜索进来的(column为s)值包含123或者222的用户
scan 'test1', FILTER=>"ColumnPrefixFilter('s') AND ( ValueFilter(=,'substring:123') OR ValueFilter(=,'substring:222') )"
ROW COLUMN+CELL
user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123
user2|ts6 column=sf:s1, timestamp=1409122355970, value=sku222
复制
列族查询
scan 'test1', FILTER=>"FamilyFilter(=,'substring:lf')"
ROW COLUMN+CELL
user3|ts7 column=lf:c1, timestamp=1554866187587, value=12345
user3|ts8 column=lf:c1, timestamp=1554866294485, value=67890
复制
4.时间戳
scan 'test1',FILTER=>"TimestampsFilter(1448069941270,1548069941230)"
复制
3.java查询
过滤器
HBase 的基本 API,包括增、删、改、查等。
增、删都是相对简单的操作,与传统的 RDBMS 相比,这里的查询操作略显苍白,只能根据特性的行键进行查询(Get)或者根据行键的范围来查询(Scan)。
HBase 不仅提供了这些简单的查询,而且提供了更加高级的过滤器(Filter)来查询。
过滤器的两类参数
过滤器可以根据列族、列、版本等更多的条件来对数据进行过滤,基于 HBase 本身提供的三维有序(行键,列,版本有序),这些过滤器可以高效地完成查询过滤的任务,带有过滤器条件的 RPC 查询请求会把过滤器分发到各个 RegionServer(这是一个服务端过滤器),这样也可以降低网络传输的压力。
使用过滤器至少需要两类参数:
一类是抽象的操作符
HBase 提供了枚举类型的变量来表示这些抽象的操作符:
LESS
LESS_OR_EQUAL
EQUAL
NOT_EQUAL
GREATER_OR_EQUAL
GREATER
NO_OP
另一类是比较器
代表具体的逻辑,例如字节级的比较,字符串级的比较等。
参数基础
有两个参数类在各类Filter中经常出现,统一介绍下:
(1)比较运算符 CompareFilter.CompareOp
比较运算符用于定义比较关系,可以有以下几类值供选择:
EQUAL 相等
GREATER 大于
GREATER_OR_EQUAL 大于等于
LESS 小于
LESS_OR_EQUAL 小于等于
NOT_EQUAL 不等于
(2)比较器 ByteArrayComparable
通过比较器可以实现多样化目标匹配效果,比较器有以下子类可以使用:
BinaryComparator 匹配完整字节数组
BinaryPrefixComparator 匹配字节数组前缀
BitComparator
NullComparator
RegexStringComparator 正则表达式匹配
SubstringComparator 子串匹配
1,FilterList
FilterList 代表一个过滤器链,它可以包含一组即将应用于目标数据集的过滤器,过滤器间具有“与” FilterList.Operator.MUST_PASS_ALL 和“或” FilterList.Operator.MUST_PASS_ONE 关系。
官网实例代码,两个“或”关系的过滤器的写法:
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE); //数据只要满足一组过滤器中的一个就可以
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(cf,column,CompareOp.EQUAL,Bytes.toBytes("my value"));
list.add(filter1);
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(cf,column,CompareOp.EQUAL,Bytes.toBytes("my other value"));
list.add(filter2);
Scan scan = new Scan();
scan.setFilter(list);
复制

2,列值过滤器--SingleColumnValueFilter
SingleColumnValueFilter 用于测试列值相等 (CompareOp.EQUAL ), 不等 (CompareOp.NOT_EQUAL),或单侧范围 (e.g., CompareOp.GREATER)。
构造函数:
(1)比较的关键字是一个字符数组
SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value)
(2)比较的关键字是一个比较器(比较器下一小节做介绍)
SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator)
注意:根据列的值来决定这一行数据是否返回,落脚点在行,而不是列。我们可以设置filter.setFilterIfMissing(true);如果为true,当这一列不存在时,不会返回,如果为false,当这一列不存在时,会返回所有的列信息
测试表user内容如下:
Table table = connection.getTable(TableName.valueOf("user"));
SingleColumnValueFilter scvf= new SingleColumnValueFilter(Bytes.toBytes("account"), Bytes.toBytes("name"),
CompareOp.EQUAL,"zhangsan".getBytes());
scvf.setFilterIfMissing(true); //默认为false, 没有此列的数据也会返回 ,为true则只返回name=lisi的数据
Scan scan = new Scan();
scan.setFilter(scvf);
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner)
List<Cell> cells= result.listCells();
for (Cell cell : cells)
String row = Bytes.toString(result.getRow());
String family1 = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println("[row:"+row+"],[family:"+family1+"],[qualifier:"+qualifier+"]"+ ",[value:"+value+"],[time:"+cell.getTimestamp()+"]");
复制

如果setFilterIfMissing(true), 有匹配只会返回当前列所在的行数据,基于行的数据 country 也返回了,因为他么你的rowkey是相同的
[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]
[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]
复制
如果setFilterIfMissing(false),有匹配的列的值相同会返回,没有此列的 name的也会返回,, 不匹配的name则不会返回。
下面 红色是匹配列内容的会返回,其他的不是account:name列也会返回,, name=lisi的不会返回,因为不匹配。
[row:lisi_1495527849910],[family:account],[qualifier:idcard],[value:42963319861234561230],[time:1495556647872]
[row:lisi_1495527850111],[family:account],[qualifier:password],[value:123451231236],[time:1495556648013]
[row:lisi_1495527850114],[family:address],[qualifier:city],[value:黄埔],[time:1495556648017]
[row:lisi_1495527850136],[family:address],[qualifier:province],[value:shanghai],[time:1495556648041]
[row:lisi_1495527850144],[family:info],[qualifier:age],[value:21],[time:1495556648045]
[row:lisi_1495527850154],[family:info],[qualifier:sex],[value:女],[time:1495556648056]
[row:lisi_1495527850159],[family:userid],[qualifier:id],[value:002],[time:1495556648060]
[row:wangwu_1495595824517],[family:userid],[qualifier:id],[value:009],[time:1495624624131]
[row:zhangsan_1495527850759],[family:account],[qualifier:idcard],[value:9897645464646],[time:1495556648664]
[row:zhangsan_1495527850759],[family:account],[qualifier:passport],[value:5689879898],[time:1495636370056]
[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]
[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]
[row:zhangsan_1495527850951],[family:address],[qualifier:province],[value:guangdong],[time:1495556648855]
[row:zhangsan_1495527850975],[family:info],[qualifier:age],[value:100],[time:1495556648878]
[row:zhangsan_1495527851080],[family:info],[qualifier:sex],[value:男],[time:1495556648983]
[row:zhangsan_1495527851095],[family:userid],[qualifier:id],[value:001],[time:1495556648996]
复制

3 键值元数据
由于HBase 采用键值对保存内部数据,键值元数据过滤器评估一行的键(ColumnFamily:Qualifiers)是否存在
3.1. 基于列族过滤数据的FamilyFilter
构造函数:
FamilyFilter(CompareFilter.CompareOp familyCompareOp, ByteArrayComparable familyComparator)
代码如下:
public static ResultScanner getDataFamilyFilter(String tableName,String family) throws IOException
Table table = connection.getTable(TableName.valueOf("user"));
FamilyFilter ff = new FamilyFilter(CompareOp.EQUAL ,
new BinaryComparator(Bytes.toBytes("account"))); //表中不存在account列族,过滤结果为空
// new BinaryPrefixComparator(value) //匹配字节数组前缀
// new RegexStringComparator(expr) // 正则表达式匹配
// new SubstringComparator(substr)// 子字符串匹配
Scan scan = new Scan();
// 通过scan.addFamily(family) 也可以实现此操作
scan.setFilter(ff);
ResultScanner resultScanner = table.getScanner(scan);
return resultScanner;
复制

测试结果:查询的都是account列簇的内容
[row:lisi_1495527849910],[family:account],[qualifier:idcard],[value:42963319861234561230],[time:1495556647872]
[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]
[row:lisi_1495527850111],[family:account],[qualifier:password],[value:123451231236],[time:1495556648013]
[row:zhangsan_1495527850759],[family:account],[qualifier:idcard],[value:9897645464646],[time:1495556648664]
[row:zhangsan_1495527850759],[family:account],[qualifier:passport],[value:5689879898],[time:1495636370056]
[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]
[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]
复制

3.2. 基于限定符Qualifier(列)过滤数据的QualifierFilter
构造函数:
QualifierFilter(CompareFilter.CompareOp op, ByteArrayComparable qualifierComparator)
Table table = connection.getTable(TableName.valueOf("user"));
QualifierFilter ff = new QualifierFilter(
CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes("name")));
// new BinaryPrefixComparator(value) //匹配字节数组前缀
// new RegexStringComparator(expr) // 正则表达式匹配
// new SubstringComparator(substr)// 子字符串匹配
Scan scan = new Scan();
// 通过scan.addFamily(family) 也可以实现此操作
scan.setFilter(ff);
ResultScanner resultScanner = table.getScanner(scan);
复制

测试结果:只返回 name 的列内容
[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]
[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]
复制
3.3. 基于列名(即Qualifier)前缀过滤数据的ColumnPrefixFilter
( 该功能用QualifierFilter也能实现 )
构造函数:
ColumnPrefixFilter(byte[] prefix)
Table table = connection.getTable(TableName.valueOf("user"));
ColumnPrefixFilter ff = new ColumnPrefixFilter(Bytes.toBytes("name"));
Scan scan = new Scan();
// 通过QualifierFilter的 newBinaryPrefixComparator也可以实现
scan.setFilter(ff);
ResultScanner resultScanner = table.getScanner(scan);
复制

返回结果:
[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]
[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]
复制
3.4. 基于多个列名(即Qualifier)前缀过滤数据的MultipleColumnPrefixFilter
MultipleColumnPrefixFilter 和 ColumnPrefixFilter 行为差不多,但可以指定多个前缀
byte[][] prefixes = new byte[][] Bytes.toBytes("name"), Bytes.toBytes("age");
//返回所有行中以name或者age打头的列的数据
MultipleColumnPrefixFilter ff = new MultipleColumnPrefixFilter(prefixes);
Scan scan = new Scan();
scan.setFilter(ff);
ResultScanner rs = table.getScanner(scan);
复制

结果:
[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]
[row:lisi_1495527850144],[family:info],[qualifier:age],[value:21],[time:1495556648045]
[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]
[row:zhangsan_1495527850975],[family:info],[qualifier:age],[value:100],[time:1495556648878]
复制
3.5. 基于列范围过滤数据ColumnRangeFilter
构造函数:
ColumnRangeFilter(byte[] minColumn, boolean minColumnInclusive, byte[] maxColumn, boolean maxColumnInclusive)
参数解释:
minColumn - 列范围的最小值,如果为空,则没有下限;
minColumnInclusive - 列范围是否包含minColumn ;
maxColumn - 列范围最大值,如果为空,则没有上限;
maxColumnInclusive - 列范围是否包含maxColumn 。
代码:
Table table = connection.getTable(TableName.valueOf("user"));
byte[] startColumn = Bytes.toBytes("a");
byte[] endColumn = Bytes.toBytes("d");
//返回所有列中从a到d打头的范围的数据,
ColumnRangeFilter ff = new ColumnRangeFilter(startColumn, true, endColumn, true);
Scan scan = new Scan();
scan.setFilter(ff);
ResultScanner rs = table.getScanner(scan);
复制

结果:返回列名开头是a 到 d的所有列数据
[row:lisi_1495527850114],[family:address],[qualifier:city],[value:黄埔],[time:1495556648017]
[row:lisi_1495527850144],[family:info],[qualifier:age],[value:21],[time:1495556648045]
[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]
[row:zhangsan_1495527850975 参考技术A HBase中支持一次性执行多个Scan操作。要实现这个功能,可以使用MultiplexTableMapper类,它是TableMapper的子类。MultiplexTableMapper会生成修复数量的线程去执行使用了不同Scan对象的map()函数。可以按以下步骤进行操作:
1. 创建多个Scan对象:创建多个Scan对象以支持多Scan操作。
2. 指定Scan操作:指定每个Scan对象中所需的行范围、过滤器和其他参数等。
3. 创建多份Scan配置:创建多个Scan配置文件,其中每个配置文件将使用一个Scan对象。
4. 创建自定义Mapper类:创建自定义的Mapper类,并将MultiplexTableMapper类作为父类。在map()方法中调用TableRecordReaderImpl类并传递适当的Scan配置文件。
5. 定义Scan配置文件:定义每个Scan配置文件中的Scan对象。
6. 运行Hadoop作业:将多个Scan配置文件作为参数提交Hadoop作业。
这样可以同时执行多个Scan操作,提高检索性能,避免枚举过程中的数据重读和数据冗余的问题。 参考技术B HBase支持在一个Scan中设置多个列族或列限定符进行查询,但是不支持将多个Scan合并成一个Scan进行查询。
如果需要在一个查询中使用多个Scan,可以使用HBase的MapReduce功能来实现。MapReduce可以将多个Scan作为输入,然后将它们合并并处理数据,最后将结果输出。基本上可以完成所有单独执行Scan时能够完成的操作,同时还支持更复杂的操作,例如合并、排序和过滤等。
具体实现方式是,在MapReduce作业的Mapper阶段中,可以通过TableMapReduceUtil.initTableMapperJob()方法传入多个Scan对象。这样,每个Mapper任务都会处理多个Scan,并生成相应的键值对。在Reducer阶段,可以将中间结果合并,得到最终的输出结果。
需要注意的是,在使用多个Scan进行查询时,需要谨慎选择Scan的范围和条件,以保证查询效率和正确性。同时,应该尽可能地避免对过多的行或列进行查询,以免影响查询性能。 参考技术C 在 HBase 中,可以使用 MultiScan 操作同时操作多个 Scan 对象。通过 MultiScan 操作,您可以同时从多个 HBase 表中检索数据,并将结果合并为单个结果集。这使得 HBase 在查询大量数据时变得有效。
下面是使用 Java API 进行 MultiScan 操作的简单示例:
```java
List<Scan> scanList = new ArrayList<>();
Scan scan1 = new Scan(Bytes.toBytes("rowkey1"), Bytes.toBytes("rowkey2"));
Scan scan2 = new Scan(Bytes.toBytes("rowkey3"), Bytes.toBytes("rowkey4"));
scanList.add(scan1);
scanList.add(scan2);
ResultScanner scanner = table.getScanner(scanList);
for (Result result : scanner)
// 处理返回的结果
scanner.close();
```
以上示例定义了两个 Scan 对象并添加到 scanList 中,在 table 上进行 MultiScan 操作并遍历结果集。您可以根据您的具体业务需求,定义多个 Scan 对象并添加到 scanList 中,这些 Scan 对象的结果将合并在一起并返回一个结果集合。 参考技术D 可以使用HBase的批量操作API,将多个Scan合并为一个批量操作。这可以极大地提高查询效率,减少RPC次数。下面是一个示例代码:
```java
List<Scan> scans = new ArrayList<>();
// 添加多个Scan
scans.add(scan1);
scans.add(scan2);
scans.add(scan3);
Result[] results = table.get(scans); // 使用批量操作API执行多个Scan
for (Result result : results)
// 对每个结果进行处理
```
上述代码中,使用了`get(List<Scan>)`方法进行批量查询,将多个Scan对象添加到一个列表中,传递给该方法,执行后得到多个结果。需要注意的是,批量操作API只能在HBase 1.3版本及以上使用。同时,将多个Scan合并为一个批量操作,可能会增加资源消耗,因此需要根据实际情况进行权衡。
HBase优化——读写优化
Hbase2.0查询优化
1)设置scan缓存
HBase中Scan查询可以设置缓存,方法是setCaching(),这样可以有效的减少服务端与客户端的交互,更有效的提升扫描查询的性能。
Scan scan = new Scan(); scan.setCaching(1000);
2)显示的指定列
当使用Scan或者GET获取大量的行时,最好指定所需要的列,因为服务端通过网络传输到客户端,数据量太大可能是瓶颈。如果能有效过滤部分数据,能很大程度的减少网络I/O的花费。
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("t"));
scan.addFamily(Bytes.toBytes("f"));
Get get = new Get(Bytes.toBytes("demo"));
get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("t"));
get.addFamily(Bytes.toBytes("f"));
一般情况下用:
scan.addColumn(…)
3)关闭ResultScanner
如果在使用 table.getScanner之后,忘记关闭该类,它会一直和服务端保持连接,资源无法释放,从而导致服务端的某些资源不可用。
所以在用完之后,需要执行关闭操作,这点与JDBS操作MySQL类似。
Scan scan = new Scan() ResultScanner scanner = table.getScanner(scan) for (Result rs <- scanner) { do something..... } scanner.close()
4) 禁用块缓存
如果批量进行全表扫描,默认是有缓存的,如果此时有缓存,会降低扫描的效率。
Scan scan = new Scan() scan.setCacheBlocks(true|false);
对于经常读到的数据,建议使用默认值,开启块缓存
5) 缓存查询结果
对于频繁查询HBase的应用场景,可以考虑在应用程序和Hbase之间做一层缓存系统(redis等),新的查询先去缓存查,缓存没有再去查Hbase。
6)设定scan的查询范围
如果可以明确的扫描范围,建议设置scan的StartRow和StopRow
Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes("0000")); scan.withStopRow(Bytes.toBytes("9999"));
7) 批量get执行
如果可以确定rowkey的值,在批量查询的时候建议用批量Get方式
List<Get> gets = new ArrayList<Get>(); gets.add(new Get(Bytes.toBytes("000"))); gets.add(new Get(Bytes.toBytes("111"))); gets.add(new Get(Bytes.toBytes("222"))); gets.add(new Get(Bytes.toBytes("333"))); table.get(gets)
Hbase2.0写入优化
写也是Hbase常有的操作之一,并且Hbase在写入操作上有着其他NoSQL无法比拟的优势,下面讲如何优化写入操作
1)关闭写WAL日志
一般为了保证系统的高可用性,WAL日志默认是开启状态,WAL主要用于灾难恢复的,如果应用可以容忍一定的数据丢失风险,可以在写数据的时候,关闭写WAL。
风险: 当RegionServer宕机时,写入的数据出现丢失,且无法恢复
2)设置AutoFlush
Htable有一个属性是AutoFlush,该属性用于支持客户端的批量更新,默认是true,当客户端每收到一条数据,立刻发送到服务端,如果设置为false,当客户端提交put请求时候,先将该请求在客户端缓存,到达阈值的时候或者执行hbase.flushcommits(),才向RegionServer提交请求。
table.setAutoFlush(false); table.setWriteBufferSize( 12 * 1024 * 1024 );
3)预创建Region
一般表刚开始只有一个Region,插入该表的数据都会保存在此Region中,插入该表的所有塑化剂都会保存在该Region中,当到达一定的阈值时,才发生分裂。 这样开始时刻针对该表的写操作都集中在某台服务器上,造成这台服务器的压力很紧张,同时对整个集群资源的浪费
建议刚开始的时候预创建Region,可以使用Hbase自带的RegionSplitter
4)延迟日志flush
默认写入操作,首先写入WAL,并且在1S内写入HDFS,这个时间默认是1S,可以通过参数配置
hbase.regionserver.optionallogflushinterval
可以配置大一点的值,比如5s,这段时间数据会保留在内存中,直到RegionServer周期性的执行flush操作。
以上是关于hbase多个scan能一起查吗的主要内容,如果未能解决你的问题,请参考以下文章