基于hbase进行用户画像查询
Posted Hadoop大数据之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于hbase进行用户画像查询相关的知识,希望对你有一定的参考价值。
用户画像是根据用户社会属性、生活习惯和消费行为等信息而抽象出的一个标签化的用户模型。构建用户画像的核心工作是给用户贴“标签”,而标签是通过对用户信息分析而来的高度精炼的特征标识,用户画像越精细对“精准营销”起的作用就越大。本文对用户画像的建设不做讨论,而是探讨一下如何存储及查询用户标签数据。用户标签的输出方式主要有两种:1、查询用户对应的标签属性;2、查询组合标签下的用户。,从hbase的应用特性及其特点可以满足第一点要求,那么根据组合标签查询用户应该如何处理呢?目前比较常用的处理方案:hbase+phoenix、hbase+elasticsearch,这两种方案后续作介绍,本文主要是探讨基于hbase的filter过滤器进行组合标签查询是否可行?
一、比较器介绍
有两个参数类在各类Filter中经常出现,下面统一介绍一下:
1)比较运算符CompareFilter.CompareOp,主要用户定义比较关系,可以有以下几类值:
EQUAL 相等
GREATER 大于
GREATER_OR_EQUAL 大于等于
LESS 小于
LESS_OR_EQUAL 小于等于
NOT_EQUAL 不等于
2)比较器ByteArrayComparable,通过比较器可以实现多样化匹配效果,比较器有以下子类可以使用:
BinaryComparator 匹配完整字节数组
BinaryPrefixComparator 匹配字节数组前缀
BitComparator 位操作比较器
NullComparator 空值比较器
RegexStringComparator 正则表达式比较器
SubstringComparator 子串匹配
二、过滤器介绍
1)FilterList
FilterList代表一个过滤器链,它可以包含一组应用于数据集的过滤器,过滤器间有“与”FilterList.Operator.MUST_PASS_ALL、“或”FilterList.Operator.MUST_PASS_ONE关系。
2)SingleColumnValueFilter
SingleColumnValueFilter用于判断列值相等、不等或者单侧关系。
注意:根据列的值来决定这一行数据是否返回,落脚点在行,而不是列。我们可以设置filter.setFilterIfMissing(true);如果为true,当这一列不存在时,不会返回,如果为false,当这一列不存在时,会返回所有的列信息。
1 byte[] family = Bytes.toBytes("cf");
2 byte[] qualify = Bytes.toBytes("01002002");
3 byte[] eqb = Bytes.toBytes("201803");
4 SingleColumnValueFilter scvf = new SingleColumnValueFilter(
5 family,
6 qualify,
7 CompareFilter.CompareOp.EQUAL,
8 eqb);
9 scvf.setFilterIfMissing(true);
10 Connection conn = null;
11 try {
12 conn = getConnection();
13 Table table = getTable(conn);
14 Scan scan = new Scan();
15 scan.setFilter(scvf);
16 ResultScanner rs = table.getScanner(scan);
17 printFirst(rs);
18 } catch (IOException e) {
19 e.printStackTrace();
20 } finally {
21 conn.close();
22 }
3)键值元数据
由于hbase采用键值对保存内部数据,键值元数据过滤器评估一行的键(ColumnFamily:Qualifiers)是否存在。
a、基于列族过滤FamilyFilter
1 byte[] family = Bytes.toBytes("cf2");
2 FamilyFilter ff = new FamilyFilter(
3 CompareFilter.CompareOp.EQUAL,
4 new BinaryComparator(family));
5 Connection conn = null;
6 try {
7 conn = getConnection();
8 Table table = getTable(conn);
9 Scan scan = new Scan();
10 scan.setFilter(ff);
11 ResultScanner rs = table.getScanner(scan);
12 printFirst(rs);
13 } catch (IOException e) {
14 e.printStackTrace();
15 } finally {
16 conn.close();
17 }
b、基于限定符Qualifier(列)过滤QualifierFilter
1 byte[] qualifier = Bytes.toBytes("01002002");
2 QualifierFilter qf = new QualifierFilter(
3 CompareFilter.CompareOp.EQUAL,
4 new BinaryComparator(qualifier));
5 Connection conn = null;
6 try {
7 conn = getConnection();
8 Table table = getTable(conn);
9 Scan scan = new Scan();
10 scan.setFilter(qf);
11 ResultScanner rs = table.getScanner(scan);
12 printFirst(rs);
13 } catch (IOException e) {
14 e.printStackTrace();
15 } finally {
16 conn.close();
17 }
c、基于Qualifier前缀过滤ColumnPrefixFilter(在QualifierFilter中设置Comparator也可实现)
d、基于多Qualifiers前缀过滤MultipleColumnPrefixFilter(多个Qualifiers之间是“与”的关系)
1 byte[][] qualifiers = {Bytes.toBytes("01002002"), Bytes.toBytes("01002006")};
2 MultipleColumnPrefixFilter mcpf = new MultipleColumnPrefixFilter(qualifiers);
3 Connection conn = null;
4 try {
5 conn = getConnection();
6 Table table = getTable(conn);
7 Scan scan = new Scan();
8 scan.setFilter(mcpf);
9 ResultScanner rs = table.getScanner(scan);
10 printFirst(rs);
11 } catch (IOException e) {
12 e.printStackTrace();
13 } finally {
14 conn.close();
15 }
e、基于列范围过滤ColumnRangeFilter
1 byte[] minCol = Bytes.toBytes("01002002");
2 byte[] maxCol = Bytes.toBytes("01002008");
3 /** 返回所有列中01002002*到01002008*的数据 */
4 ColumnRangeFilter rcf = new ColumnRangeFilter(minCol, true, maxCol, true);
5 Connection conn = null;
6 try {
7 conn = getConnection();
8 Table table = getTable(conn);
9 Scan scan = new Scan();
10 scan.setFilter(rcf);
11 ResultScanner rs = table.getScanner(scan);
12 printFirst(rs);
13 } catch (IOException e) {
14 e.printStackTrace();
15 } finally {
16 conn.close();
17 }
4)RowKey
当需要根据行键特征查找一个范围的行数据时,使用Scan的startRow和stopRow会更高效,但是,startRow和stopRow只能匹配行键的开始字符,不能匹配中间包含的字符。当对行键进行更复杂的过滤时,可以使用RowFilter。
1 byte[] rk = Bytes.toBytes(StringUtils.reverse("15218818255"));
2 byte[] startRow = Bytes.toBytes("510000000000");
3 byte[] stopRow = Bytes.toBytes("590000000000");
4 RowFilter rf = new RowFilter(
5 CompareFilter.CompareOp.EQUAL,
6 new BinaryComparator(rk));
7 Connection conn = null;
8 try {
9 conn = getConnection();
10 Table table = getTable(conn);
11 Scan scan = new Scan();
12 scan.setFilter(rf);
13 scan.setStartRow(startRow);
14 scan.setStopRow(stopRow);
15 ResultScanner rs = table.getScanner(scan);
16 printFirst(rs);
17 } catch (IOException e) {
18 e.printStackTrace();
19 } finally {
20 conn.close();
21 }
5)PageFilter
指定页面行数,返回对应行数的结果集。
需要注意的是,该过滤器是分别作用到各个region server的,它只能保证当前查询的每个region返回的结果行数不超过指定页面行数。
1 byte[] startRow = Bytes.toBytes("510000000000");
2 /** startRow与stopRow同时落在一个region中,因此返回的行数等于页面行数 */
3 byte[] stopRow = Bytes.toBytes("540000000000");
4 /** startRow与stopRow分别落在两个region中,因此返回的行数为页面行数的2倍 */
5 //byte[] stopRow = Bytes.toBytes("590000000000");
6 PageFilter pf = new PageFilter(2L);
7 Connection conn = null;
8 try {
9 conn = getConnection();
10 Table table = getTable(conn);
11 Scan scan = new Scan();
12 scan.setFilter(pf);
13 scan.setStartRow(startRow);
14 scan.setStopRow(stopRow);
15 ResultScanner rs = table.getScanner(scan);
16 System.out.println("Effect row is " + countRow(rs));
17 } catch (IOException e) {
18 e.printStackTrace();
19 } finally {
20 conn.close();
21 }
6)SkipFilter
根据整行中的每个列来做过滤,只要存在一列不满足条件,整行都被过滤掉,通常配合ValueFilter一起使用。
7)FirstKeyOnlyFilter
该过滤器仅仅返回每一行中的第一个cell的值,可以用于高效的执行行数统计操作。
1 FirstKeyOnlyFilter fkof = new FirstKeyOnlyFilter();
2 Connection conn = null;
3 try {
4 conn = getConnection();
5 Table table = getTable(conn);
6 Scan scan = new Scan();
7 scan.setFilter(fkof);
8 ResultScanner rs = table.getScanner(scan);
9 System.out.println("Effect row is " + countRow(rs));
10 } catch (IOException e) {
11 e.printStackTrace();
12 } finally {
13 conn.close();
14 }
更多的Comparator和Filter介绍,请查看官网文档:
http://hbase.apache.org/apidocs/index.html
三、基于Filter进行用户标签查询
案例分析:查询同时具有“01002004”、“01002001”和“01003001”三个标签属性的用户数据(hbase库表用户数据>8千万行)。
代码实现:
1 long start = System.currentTimeMillis();
2 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
3 byte[] family = Bytes.toBytes("cf");
4 byte[] qualifier1 = Bytes.toBytes("01003001");
5 byte[] qualifier2 = Bytes.toBytes("01002004");
6 byte[] qualifier3 = Bytes.toBytes("01002001");
7 byte[] val = Bytes.toBytes("201803");
8 SingleColumnValueFilter scvf1 = new SingleColumnValueFilter(family, qualifier1, CompareFilter.CompareOp.EQUAL, val);
9 SingleColumnValueFilter scvf2 = new SingleColumnValueFilter(family, qualifier2, CompareFilter.CompareOp.EQUAL, val);
10 SingleColumnValueFilter scvf3 = new SingleColumnValueFilter(family, qualifier3, CompareFilter.CompareOp.EQUAL, val);
11 scvf1.setFilterIfMissing(true);
12 scvf2.setFilterIfMissing(true);
13 scvf3.setFilterIfMissing(true);
14 filterList.addFilter(scvf1);
15 filterList.addFilter(scvf2);
16 filterList.addFilter(scvf3);
17 Connection conn = null;
18 long total = 0;
19 try {
20 conn = getConnection();
21 Table table = getTable(conn);
22 Scan scan = new Scan();
23 scan.setFilter(filterList);
24 ResultScanner rs = table.getScanner(scan);
25 total = printRow(rs);
26 } catch (IOException e) {
27 e.printStackTrace();
28 } finally {
29 conn.close();
30 long end = System.currentTimeMillis();
31 System.out.println("This query take time " + (end-start)/1000 + "s, effect row is " + total);
32 }
分析结果:This query take time 734s, effect row is 135599
结论:库表数据中Qualifiers同时满足这三个条件的记录较多,耗时非常长,从技术原理来看,filter的本质还是采用scan方式来实现,hbase的scan操作是非常耗时的,特别是全表scan,如果想要提高scan性能,必须设置一个合理的startRow和stopRow范围。
以上是关于基于hbase进行用户画像查询的主要内容,如果未能解决你的问题,请参考以下文章