基于hbase进行用户画像查询

Posted Hadoop大数据之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于hbase进行用户画像查询相关的知识,希望对你有一定的参考价值。

用户画像是根据用户社会属性、生活习惯和消费行为等信息而抽象出的一个标签化的用户模型。构建用户画像的核心工作是给用户贴“标签”,而标签是通过对用户信息分析而来的高度精炼的特征标识,用户画像越精细对“精准营销”起的作用就越大。本文对用户画像的建设不做讨论,而是探讨一下如何存储及查询用户标签数据。用户标签的输出方式主要有两种:1、查询用户对应的标签属性;2、查询组合标签下的用户。,从hbase的应用特性及其特点可以满足第一点要求,那么根据组合标签查询用户应该如何处理呢?目前比较常用的处理方案:hbase+phoenix、hbase+elasticsearch,这两种方案后续作介绍,本文主要是探讨基于hbase的filter过滤器进行组合标签查询是否可行?


一、比较器介绍

有两个参数类在各类Filter中经常出现,下面统一介绍一下:

1)比较运算符CompareFilter.CompareOp,主要用户定义比较关系,可以有以下几类值:

  1. EQUAL    相等

  2. GREATER    大于

  3. GREATER_OR_EQUAL    大于等于

  4. LESS    小于

  5. LESS_OR_EQUAL    小于等于

  6. NOT_EQUAL    不等于

2)比较器ByteArrayComparable,通过比较器可以实现多样化匹配效果,比较器有以下子类可以使用:

  1. BinaryComparator    匹配完整字节数组

  2. BinaryPrefixComparator    匹配字节数组前缀

  3. BitComparator    位操作比较器

  4. NullComparator    空值比较器

  5. RegexStringComparator    正则表达式比较器

  6. SubstringComparator    子串匹配


二、过滤器介绍

1)FilterList

FilterList代表一个过滤器链,它可以包含一组应用于数据集的过滤器,过滤器间有“与”FilterList.Operator.MUST_PASS_ALL“或”FilterList.Operator.MUST_PASS_ONE关系。

2)SingleColumnValueFilter

SingleColumnValueFilter用于判断列值相等、不等或者单侧关系。

注意:根据列的值来决定这一行数据是否返回,落脚点在行,而不是列。我们可以设置filter.setFilterIfMissing(true);如果为true,当这一列不存在时,不会返回,如果为false,当这一列不存在时,会返回所有的列信息。

 1    byte[] family = Bytes.toBytes("cf");
2    byte[] qualify = Bytes.toBytes("01002002");
3    byte[] eqb = Bytes.toBytes("201803");
4    SingleColumnValueFilter scvf = new SingleColumnValueFilter(
5            family,
6            qualify,
7            CompareFilter.CompareOp.EQUAL,
8            eqb);
9    scvf.setFilterIfMissing(true);
10    Connection conn = null;
11    try {
12        conn = getConnection();
13        Table table = getTable(conn);
14        Scan scan = new Scan();
15        scan.setFilter(scvf);
16        ResultScanner rs = table.getScanner(scan);
17        printFirst(rs);
18    } catch (IOException e) {
19        e.printStackTrace();
20    } finally {
21        conn.close();
22    }

3)键值元数据

由于hbase采用键值对保存内部数据,键值元数据过滤器评估一行的键(ColumnFamily:Qualifiers)是否存在。

a、基于列族过滤FamilyFilter

 1    byte[] family = Bytes.toBytes("cf2");
2    FamilyFilter ff = new FamilyFilter(
3            CompareFilter.CompareOp.EQUAL,
4            new BinaryComparator(family));
5    Connection conn = null;
6    try {
7        conn = getConnection();
8        Table table = getTable(conn);
9        Scan scan = new Scan();
10        scan.setFilter(ff);
11        ResultScanner rs = table.getScanner(scan);
12        printFirst(rs);
13    } catch (IOException e) {
14        e.printStackTrace();
15    } finally {
16        conn.close();
17    }

b、基于限定符Qualifier(列)过滤QualifierFilter

 1    byte[] qualifier = Bytes.toBytes("01002002");
2    QualifierFilter qf = new QualifierFilter(
3            CompareFilter.CompareOp.EQUAL,
4            new BinaryComparator(qualifier));
5    Connection conn = null;
6    try {
7        conn = getConnection();
8        Table table = getTable(conn);
9        Scan scan = new Scan();
10        scan.setFilter(qf);
11        ResultScanner rs = table.getScanner(scan);
12        printFirst(rs);
13    } catch (IOException e) {
14        e.printStackTrace();
15    } finally {
16        conn.close();
17    }

c、基于Qualifier前缀过滤ColumnPrefixFilter(在QualifierFilter中设置Comparator也可实现)

d、基于多Qualifiers前缀过滤MultipleColumnPrefixFilter(多个Qualifiers之间是“与”的关系)

 1    byte[][] qualifiers = {Bytes.toBytes("01002002"), Bytes.toBytes("01002006")};
2    MultipleColumnPrefixFilter mcpf = new MultipleColumnPrefixFilter(qualifiers);
3    Connection conn = null;
4    try {
5        conn = getConnection();
6        Table table = getTable(conn);
7        Scan scan = new Scan();
8        scan.setFilter(mcpf);
9        ResultScanner rs = table.getScanner(scan);
10        printFirst(rs);
11    } catch (IOException e) {
12        e.printStackTrace();
13    } finally {
14        conn.close();
15    }

e、基于列范围过滤ColumnRangeFilter

 1    byte[] minCol = Bytes.toBytes("01002002");
2    byte[] maxCol = Bytes.toBytes("01002008");
3    /** 返回所有列中01002002*到01002008*的数据 */
4    ColumnRangeFilter rcf = new ColumnRangeFilter(minCol, true, maxCol, true);
5    Connection conn = null;
6    try {
7        conn = getConnection();
8        Table table = getTable(conn);
9        Scan scan = new Scan();
10        scan.setFilter(rcf);
11        ResultScanner rs = table.getScanner(scan);
12        printFirst(rs);
13    } catch (IOException e) {
14        e.printStackTrace();
15    } finally {
16        conn.close();
17    }

4)RowKey

当需要根据行键特征查找一个范围的行数据时,使用Scan的startRow和stopRow会更高效,但是,startRow和stopRow只能匹配行键的开始字符,不能匹配中间包含的字符。当对行键进行更复杂的过滤时,可以使用RowFilter。

 1    byte[] rk = Bytes.toBytes(StringUtils.reverse("15218818255"));
2    byte[] startRow = Bytes.toBytes("510000000000");
3    byte[] stopRow = Bytes.toBytes("590000000000");
4    RowFilter rf = new RowFilter(
5            CompareFilter.CompareOp.EQUAL,
6            new BinaryComparator(rk));
7    Connection conn = null;
8    try {
9        conn = getConnection();
10        Table table = getTable(conn);
11        Scan scan = new Scan();
12        scan.setFilter(rf);
13        scan.setStartRow(startRow);
14        scan.setStopRow(stopRow);
15        ResultScanner rs = table.getScanner(scan);
16        printFirst(rs);
17    } catch (IOException e) {
18        e.printStackTrace();
19    } finally {
20        conn.close();
21    }

5)PageFilter

指定页面行数,返回对应行数的结果集。

需要注意的是,该过滤器是分别作用到各个region server的,它只能保证当前查询的每个region返回的结果行数不超过指定页面行数。

 1    byte[] startRow = Bytes.toBytes("510000000000");
2    /** startRow与stopRow同时落在一个region中,因此返回的行数等于页面行数 */
3    byte[] stopRow = Bytes.toBytes("540000000000");
4    /** startRow与stopRow分别落在两个region中,因此返回的行数为页面行数的2倍 */
5    //byte[] stopRow = Bytes.toBytes("590000000000");
6    PageFilter pf = new PageFilter(2L);
7    Connection conn = null;
8    try {
9        conn = getConnection();
10        Table table = getTable(conn);
11        Scan scan = new Scan();
12        scan.setFilter(pf);
13        scan.setStartRow(startRow);
14        scan.setStopRow(stopRow);
15        ResultScanner rs = table.getScanner(scan);
16        System.out.println("Effect row is " + countRow(rs));
17    } catch (IOException e) {
18        e.printStackTrace();
19    } finally {
20        conn.close();
21    }

6)SkipFilter

根据整行中的每个列来做过滤,只要存在一列不满足条件,整行都被过滤掉,通常配合ValueFilter一起使用。

7)FirstKeyOnlyFilter

该过滤器仅仅返回每一行中的第一个cell的值,可以用于高效的执行行数统计操作。

 1    FirstKeyOnlyFilter fkof = new FirstKeyOnlyFilter();
2    Connection conn = null;
3    try {
4        conn = getConnection();
5        Table table = getTable(conn);
6        Scan scan = new Scan();
7        scan.setFilter(fkof);
8        ResultScanner rs = table.getScanner(scan);
9        System.out.println("Effect row is " + countRow(rs));
10    } catch (IOException e) {
11        e.printStackTrace();
12    } finally {
13        conn.close();
14    }

更多的Comparator和Filter介绍,请查看官网文档:

http://hbase.apache.org/apidocs/index.html


三、基于Filter进行用户标签查询

案例分析:查询同时具有“01002004”、“01002001”和“01003001”三个标签属性的用户数据(hbase库表用户数据>8千万行)。

代码实现:

 1    long start = System.currentTimeMillis();
2    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
3    byte[] family = Bytes.toBytes("cf");
4    byte[] qualifier1 = Bytes.toBytes("01003001");
5    byte[] qualifier2 = Bytes.toBytes("01002004");
6    byte[] qualifier3 = Bytes.toBytes("01002001");
7    byte[] val = Bytes.toBytes("201803");
8    SingleColumnValueFilter scvf1 = new SingleColumnValueFilter(family, qualifier1, CompareFilter.CompareOp.EQUAL, val);
9    SingleColumnValueFilter scvf2 = new SingleColumnValueFilter(family, qualifier2, CompareFilter.CompareOp.EQUAL, val);
10    SingleColumnValueFilter scvf3 = new SingleColumnValueFilter(family, qualifier3, CompareFilter.CompareOp.EQUAL, val);
11    scvf1.setFilterIfMissing(true);
12    scvf2.setFilterIfMissing(true);
13    scvf3.setFilterIfMissing(true);
14    filterList.addFilter(scvf1);
15    filterList.addFilter(scvf2);
16    filterList.addFilter(scvf3);
17    Connection conn = null;
18    long total = 0;
19    try {
20        conn = getConnection();
21        Table table = getTable(conn);
22        Scan scan = new Scan();
23        scan.setFilter(filterList);
24        ResultScanner rs = table.getScanner(scan);
25        total = printRow(rs);
26    } catch (IOException e) {
27        e.printStackTrace();
28    } finally {
29        conn.close();
30        long end = System.currentTimeMillis();
31        System.out.println("This query take time " + (end-start)/1000 + "s, effect row is " + total);
32    }

分析结果:This query take time 734s, effect row is 135599

结论:库表数据中Qualifiers同时满足这三个条件的记录较多,耗时非常长,从技术原理来看,filter的本质还是采用scan方式来实现,hbase的scan操作是非常耗时的,特别是全表scan,如果想要提高scan性能,必须设置一个合理的startRow和stopRow范围。

以上是关于基于hbase进行用户画像查询的主要内容,如果未能解决你的问题,请参考以下文章

基于HBase和Spark构建企业级数据处理平台

职位画像中phoenix链接HBase异常之版本不匹配

HBase赠书 | 阿里云HBase SQL一站式解决复杂查询难题

Spark DataFrame写入HBase的常用方式

Hbase优化

基于Flink商品实时推荐系统项目