基于 MaxCompute + Hologres 的人群圈选和数据服务实践

Posted 阿里云云栖号

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于 MaxCompute + Hologres 的人群圈选和数据服务实践相关的知识,希望对你有一定的参考价值。

简介: 本文主要介绍如何通过 MaxCompute 进行海量人群的标签加工,通过 Hologres 进行分析建模,从而支持大规模人群复杂圈选场景下的交互式体验,以及基于API的数据服务最佳实践。

本文作者 刘一鸣 阿里云智能 高级产品专家

人群圈选系统基本逻辑架构

人群圈选并不是一个新业务, 几乎所有的互联网公司都在做,因为这是一个基本营销场景,选定的人群要发代金券,要导入流量,要做针对性促销,要选择合适的人群,那怎么做这件事情呢?实际上要通过人群的行为特征,采购特性,关注特征,兴趣特性,甚至是教育程度等等,把人群划分成不同的组。通过划分人群组,在有限的营销预算里面,将资源投放给转化率或点击率最高的人群。

基本的业务架构逻辑如下图,自下而上。首先是标签加工引擎,主要以离线加工为主。在标签加工引擎内,会对用户历史的采购行为、访问行为、关注行为等等做很多标签,可以统计出来,哪些人对哪些商品关注过多少次,点击过多少次,留意过多少次等等,会有很多统计性的属性在里面。这些标签会导入到在线画像服务引擎内,服务引擎是给运营人员,广告主进行交互式的查询,因为要根据用户行为特征,筛选出最关注的用户群体。这个用户群体可能是30天内关注某些商品但是没有买的群体,或者是相关的上下游产品,通过行为特征筛选出来。筛选过程是一个高度交互过程,因为一个人的行为特征是非常复杂的。所以需要频繁的选择某个条件,去掉某个条件,条件和条件之间可能会做合并、去重的操作等等。直到把人群大小限定到预算可支持的范围内,比如我们要投递给1万个人广告,那要通过各种限定条件把这1万个人找出来。找出来还不能做直接投递,还需要对人群做更细粒度的分析,通过历史数据行为分析这1万个人是不是想要的目标人群。之后会把目标群体以投递包的形式,导出给投递系统。这是一个基本业务逻辑。

那这个业务需求的背后技术要求是什么呢?

典型人群业务版块的核心是洞察分析,洞察分析一般的场景是,要支撑几万个不同的广告主,他们会在平台上自由选择更感兴趣的人群,每个广告主对人群的诉求是不一样的,某些人关注的购买力,有些关注的是收藏行为。每天数万广告主发出数百万次的查询请求,构建数万次各种人群包,各个系统的计算复杂度是要求非常高的。这里的核心诉求包含几个点,毫秒级洞察,因为所有的查询希望是交互式的,需要在界面上每一次互动,每一次下拉菜单,每一次选择,每一次条件组合,都希望看到一个互动结果,整个人群是变大还是变小,目标人群是不是跟期望的相似。这一点对性能要求是非常大的。同时提醒大家,数据一定要脱敏处理,保护好用户的个人隐私,所有的分析都是建立在合规数据基础之上的分析。

人群圈选系统服务引擎核心诉求

规模数据上的交互式分析性能

数万广告主提交数百万次的数据查询,需要毫秒级的响应,查的快是必须的。这个快加了一个限定词,是规模数据。百万级不算规模,行为日志是非常大的,希望是百亿级别以上,依旧有一个很好的交互式分析能力,能够在秒级响应。

灵活筛选能力

用户的筛选行为多种多样,等值比较、数值大小范围比较、时间范围比较等。各种各样的筛选条件能够灵活组合,表达这些筛选结果就体现出来计算引擎的能力。

高吞吐更新能力

用户标签并不是静态的,当前一切实时化,一切在线化,所有的行为数据变化,都希望能够实时触发,实时反馈下一时刻的系统决策。比如最新收藏夹里放了什么商品,这种行为能不能成为在线画像的一部分。所以对高实时的吞吐能力要求会很高。

从计算层面来讲,可以分成下图几种计算模式。

标签过滤分为等值过滤,可以用Equal/In/Between,这些过滤可以在百亿级别上进行操作。操作之后的结果集,要做很多的交差并集,举个常见例子,一个用户既关注了竞品品牌也关注了本公司商品,却没有买,这里面其实有并的关系,有差的关系,有交的关系。所以这些人群关系之间要组合,有很高的交差并集计算。最后还有很强的精确去重的需求,因为最终要把计算结果,变成一个唯一定位用户的ID,这个ID会用来做广告的投递。那这些需求,在引擎层面上就是数据读取效率怎么样,如果用行存读取是不是会出现IO放大的问题,数据按行去存,真正过滤是按照某一列过滤,但是IO读取,会把整行读取,会出现IO放大问题。列存还会有索引问题、过滤效果问题。计算算子上表连接时是Hash JOIN方式还是用Nest Loop JOIN方式。精确去重的效果如何。这些都是对计算引擎效率上有很高的要求。所以本质上是要解决高效数据存储与过滤、关系运算内存/CPU消耗、精确去重内存/CPU消耗问题。

这里就有很多不同的解决优化思路,是用更多的内存还是CPU。行业内大致的思路有两种。

一种是通过预结算思路,有Kylin/Druid这样的技术。这些技术可以在一些预定义的维度上,进行一次提前的预加工。预加工后,数据集会在本质上进行减少。比如要找一个用户群体,关注了第一个商品却没有关注第二个商品。每一个结果集都可以用bitmap数组来表达,数组之间做交差并集效率是非常高的。预计算技术实际上是把精确去重和交差并集上计算是有很大好处的。但缺陷也比较明显,最大的缺陷就是不灵活,同时完整SQL表达能力也比较弱。另一种是属于MPP分布式数据库技术,一些通过列存、分布式、索引方式提供更好的查询性能。

所以真正落地一套人群筛选方案时,一般不是只选择一个方案。因为不管是预计算方案还是MPP方案都有一些本质的缺陷。

那市场上哪些技术更适合做存储和查询呢?

第一类技术,大家都比较熟悉的事务数据库。事务数据库是行存储,对单行数据写入存储效率是非常高的,用来做查询,做过滤统计,在千万级以上会发现消耗资源是非常大的。所以一般不会拿TP系统直接做分析操作。

第二类系统,AP系统,是我们常见OLAP系统。这一类系统针对大规模数据扫描场景做了优化,包括利用分布式技术,列存技术,压缩技术、索引的技术等等。这类技术查的都很快,但本质缺陷是大部分系统更新上做的不太友好,因为数据查的快,所以数据该紧凑紧凑,该压缩压缩,所以在更新能力上弱一些。还有一类系统,在大数据分析也常见,我们把它叫Serving系统,支持在线业务的一类系统,这类系统查的是足够快,但牺牲的其实是查询的灵活性。比如,文档数据库、KeyValue系统查询方式有很大的局限,只能按照它的key去查询。这样灵活性减少了,但是性能上无限放大,因为可以横向扩展,因为key相对来说访问效率是最高的,而且更新效率也非常高,按照key更新,可以替换整条记录。我们过去就不得不针对不同场景,把数据拆分到TP、AP、Serving,数据在几个系统之间来回传递。让我们对整个系统的依赖度变的更高,只要数据有一次依赖,就会产生一次数据不一致,产生数据不一致就意味着数据的修正,数据的开发成本变的更高。所以大家都会在很多领域做创新,第一类创新是在TP和AP领域里做一个混合负载能力。尝试通过一个技术把这两个场景解决掉。有支持事务,又能支持分析,也希望未来有一天这个系统真正很好的落地。这类系统也有一定的局限,要支持事务操作,各种分布式锁开销还是必不可少的。这类系统因为具备了一些能力,所以在整个并发和性能上,开销是比较大的,所以有一定的性能瓶颈。

在下图左侧部分也是可以做一些创新的,左侧的创新会发现最大的问题是不支持事务。把事务能力弱化,不需要那么多事务,希望查的足够快,更新的足够快。所以这个地方是有可能做技术创新,这个技术既具备很好的灵活的分析能力,也具备很好的数据写入能力,有具备完整的SQL表达能力。所以左侧的交集部分的技术,很适合刚才提到的三点技术要求。这就是今天要分享的产品Hologres。

Hologres=向量化SQL引擎 + 灵活的多维分析+高吞吐实时更新

Hologres,一站式实时数仓,提供实时分析(OLAP)与在线服务(点查)两种能力,与MaxCompute无缝打通,实现一套架构,多种负载(OLAP、在线服务、交互式分析)共存,减少数据孤岛,避免数据割裂,简化链路,提升用户体验。

统一存储

  • 一份数据支持多种负载 (OLAP、在线服务、MaxCompute交互式分析),减少数据割裂
  • 数据无孤岛,无频繁数据导入导出,提高数据开发效率、简化链路

统一接口

  • 接口兼容开源Postgres协议,支持主流开发和BI工具,无需应用层重写,生态开放
  • 统一用SQL描述多种场景,提高数据应用开发效率
  • 统一数据模型,通过“表”来描述数仓模型,语义一致

实时离线一体

  • 支持实时写入、实时更新、写入即可查,原生集成Flink
  • 与MaxCompute存储无缝打通,透明加速,无需数据移动,支持交互式分析能力,支持实时数据关联历史数据

高性能

  • OLAP场景性能好于Clickhouse、Impala、Presto,支持亚秒级响应与高QPS
  • 在线服务(点查)场景性能好于HBase,点查支持100K+QPS

Hologres:一站式实时数仓

Hologres为什么能支持高性能,高吞吐写入?

实际上没有神秘的地方,Hologres更多还是依赖于整个IT行业,有很多底层技术上的进步。比如,带宽变宽,延迟变低。好处是之前必须依赖本地的操作,比如之前依赖本地磁盘,现在可以依赖网盘。其实Hologres底层的存储,分多副本存储,高可靠存储,把这些负责状态管理的事情,都交给阿里云,底层是盘古存储引擎,自带多副本,自带压缩,自带缓存,自带高可靠。这就会使整个计算节点的逻辑变的轻薄和简单,也让高可靠更加简单。任何一个节点宕掉之后,可以很快从一个分布式的网盘里恢复状态。会让计算层变的无状态,这是第一点。第二点是磁盘的利用,过去磁盘的转速有机械瓶颈。机械磁盘是按圈去转的,一秒钟多少转。所以我们的IO场景都是面向扫描场景做了大量的优化。我们希望所有的数据都是以块为单位,进行更新、读写。所以在过去这种高更新场景,在整个数仓里很难实现。Hologres是采用SSD设计,固态硬盘支持更好的随机读写能力。这让我们设计存储架构的时可以抛开过去必须依赖于这种扫描场景,去设计整个存储的数据结构。Hologres可以行存也可以列存,分别适应不同的场景,同时也采用log structured merge tree 的方式。支持高吞吐数据的写入和更新的场景。第三个是CPU多核化,CUP的主频已经不会有本质的提升。但是在多核化场景下,如果可以把一CPU内部多个核并行利用起来,就能把CPU资源充分发挥到极致。这就要求对操作系统的底层语言掌握的要比较好,Hologres使用C++实现的数仓。Hologres底层的算子都会用向量化方式重写,尽量发挥多核化并行计算能力,吧计算力发挥到极致。

从下图可以看出,我们在网络上、存储上、计算上、硬件层面有很多改进,这些改进都充分发挥出来,能够做出一个不一样的效果的系统。

人群圈选场景之前提到,既有预计算场景,又有MPP分布式计算场景。使用单一某一个技术往往不太适合,真正落地的时候,希望既有预计算又有分布式计算,要把两个技术更好的整合在一起。比如维度过滤场景就很适合用BITMAP,因为可以在BITMAP上做位图索引。如true和false的场景,购买级别、对什么产品关注等等,这些需要过滤的场景就适合做位图索引。Hologres是支持位图索引的。

第二种是关系运算,关系运算是我们提到的各种数据集之间的交差并,也非常适合位图计算。因为位图计算相当于是0和1之间,做很多与或差的操作,而且是并行操作,效率也是非常高的。

精确去重是BITMAP天生就具备的能力,因为位图在构建时,就通过下标位,就唯一确定了ID。通过不同下标位之间上面一的值的简单累加,就可以很快计算出精确去重的值是多少。这几乎是把一个O(N)的问题变成O(1)的场景,效果也非常明显。所以在做人群圈选场景里面,预计算是很重要的技术。Hologres支持RoaringBitmap数据类型,高效率实现Bitmap的交叉并计算。

上文提到预计算是灵活性不足,需要通过分布式计算把计算力发挥出来,就用到了Hologres的向量化执行引擎。对MaxCompute数据外表直接加速,包括MaxCompute数据同步到Hologres里,是会比MaxCompute同步到其它数据源性能提高10倍已上。

典型架构图

典型架构图如下,数据源基本是通过埋点数据,通过消息中间件kafka,第一件时间投递到Flink,做一次轻量级数据加工,包括数据治理的修正,数据轻度汇总,数据维度拉宽。其中维度关联是一个很重要的场景,真正的埋点数据都是记录某些ID,这些ID都要转换成有属性意义的维度信息。第一件事就是做维度拉宽,这是就可以使用Hologres的行存表,维度关联时,基本是通过主键去关联的,使用Hologres的行存表,可以存几亿几十亿的维度信息。这些信息可以实时的被更新。加工的结果集会写到kafka里面,因为并不是一次加工,可能是加工几个循环。通过kafka做消息驱动的方式,在Flink里面做几次加工,加工的结果基本上双写的场景会比较多,一部分实时写入Hologres,另一部分以批量方式写到MaxCompute里面。离线数仓到实时数仓是一个很好的数据修正的场景,数据是一定会被修正的,所以会有大量通过离线数仓对实时数仓进行修正的场景,包括标签加工也是典型的离线数仓来补充实时数仓的场景。所以一些行为是需要通过离线数仓加工好之后,把数据同步到实时数仓里。但有另外一些属性,是跟当下决策有关系的。这些是可以直接写到实时数仓Hologres里。所以可以把标签分为离线和实时两部分,实时写到Hologres,离线通过MaxCompute加工后同步到Hologres。

在对外提供数据服务是,有几种方式。建议的方式是,对外提供服务时,加一个网关,网关服务里面会做很多限流、熔断等等,这也是能提高数据服务稳定性的一个很好的帮助。如果是对内使用交互式分析的长治,可以直接通过JDBC的方式连接Hologres,如果是一个在线应用,建议通过API网关连接到Hologres。

MaxCompute

Hologres

使用场景

ETL加工,标签加工

在线查询、面向外部应用,交互式分析

用户使用

异步的Job/Task

同步的Query

集群资源

共享大集群,计算时资源可扩展

独享集群,计算资源预分配

计算引擎

基于Stage和File设计的,持久化的,可扩展SQLEngine

基于内存的,超快速响应的SQLEngine,计算不落盘

调度方式

进程级别,运行时分配

轻量级线程,资源预留

扩展性

几乎不受限制

复杂查询尽量避免跨多节点数据shuffle

存储格式

列式

行式、列式共存,面向不同场景

存储成本

基于Pangu,HDD成本低

基于Pangu,SSD成本相对高

更新方式

批量更新

实时更新、批量更新

接口标准

MCSQL

PostgreSQL

数据结构层

离线数仓加工两张表,一个是用户基础属性表,记录一些用户属性,性别城市年龄等。一个是交易明细表,记录某个人在某一天针对某个商品买过多少,看过多少,收藏多少等。这些通过离线数仓加工好后,数据导入Hologres。在通过配置把表列描述信息以人类可读的方式描述出来,再配置相关属性标签。把标签上线后,广告主会通过交互界面进行配置筛选。这种筛选背后都是翻译成各种SQL语句,其实就是个各种SQL表达式。真正把查询下发到底层引擎。那下发时底层引擎该如何建表呢?

宽表模式

•每行描述一个用户的标签组合,每个key是一列,每一行对应value。

•列不建议超过300列,列多会降低实时写入的性能。分为热点标签和非热点标签

•热点标签独立为列,具备明确的数据类型,可以针对性设计索引,对查询友好

•非热点标签,通过数组类型和JSON支持,适合动态更新,但索引不是最优,可扩展性更好

•适应场景:维度属性数量较低;实时写入频繁;更新以人的单位

•优势:开发简单快速上线

•方案描述:

用户数据:例如user_tags表,宽表

行为数据:例如shop_behavior表,事实表

更新时,可以实时、批量更新不同的列

案例

-------------------- 用户标签维度表 ---------------------
begin;
--3个热点标签字段(text、integer、boolean类型),2个扩展标签字段(text[]类型和JSON类型)
create table user_tags
(
  user_id text not null primary key,
  city_id text,
  consume_level integer,
  marriaged boolean,
  tag_array text[],
  tag_json json
);
call set_table_property('user_tags', 'orientation', 'column');
-- 分布列
call set_table_property('user_tags', 'distribution_key', 'user_id');
-- text类型设置bitmap索引
call set_table_property('user_tags', 'bitmap_columns', 'city_id,tag_array');
-- 热点标签,这是字典编码
call set_table_property('user_tags', 'dictionary_encoding_columns', ‘city_id:auto’);
commit;
-------------------- 用户行为事实表 ---------------------
begin;
create table shop_behavior
(
  user_id text not null,
  shop_id text not null,
  pv_cnt integer,
  trd_amt integer,
  ds integer not null
);
call set_table_property('shop_behavior', 'orientation', 'column');
call set_table_property('shop_behavior', 'distribution_key', 'user_id');
--- 聚合键 对group by等运算更加友好
call set_table_property('shop_behavior', 'clustering_key', 'ds,shop_id');
Commit;

窄表模式

将user_tag表转为窄表,每一个标签一行记录,标签名为一列,标签值为一列。

数据类型均退化为字符串类型,适合标签不固定,标签稀疏,允许牺牲部分性能但提高标签定义的灵活度。支持几十到几十万不同标签规模。

•适应场景:维度属性数量高;更新以标签的单位

•优势:开发简单快速上线

案例

-------------------- 用户标签维度表 ---------------------
begin;
create table tag2.user_tags
(
  userid text not null,
  tag_key text,
  tag_value text,
  ds text
) partition by list(ds);

call set_table_property('tag2.user_tags', 'orientation', 'column’); 
-- 分布列
call set_table_property('tag2.user_tags', 'distribution_key', 'user_id');
call set_table_property('tag2.user_tags', 'bitmap_columns', 'tag_key,tag_value');
call set_table_property('tag2.user_tags', 'dictionary_encoding_columns', 'tag_key:auto,tag_value:auto');
commit;
   
--查询例子--                        
WITH 
f1 AS (
    SELECT userid 
    FROM tag2.user_tags
    WHERE ds = '20210101'
    AND   tag_key = 'tag_single'
    AND   tag_value = 'myname'
),
f2 AS (
    SELECT userid 
    FROM tag2.user_tags
    WHERE ds = '20210101'
    AND   tag_key = 'tag_date'
    AND   tag_value > '20210101'
),
f3 AS (
    SELECT userid 
    FROM tag2.user_tags
    WHERE ds = '20210101'
    AND   tag_key = 'tag_numeric'
    AND   to_number(tag_value, '99G999D9S') > 90
),
f4 AS (
    SELECT userid 
    FROM tag2.user_tags
    WHERE ds = '20210101'
    AND   tag_key = 'tag_multi'
    AND   tag_value IN ('HONOR', 'MI')
)
SELECT COUNT(DISTINCT userid)
FROM ((SELECT userid FROM f1 UNION SELECT userid FROM f2) INTERSECT (SELECT userid FROM f3 EXCEPT SELECT userid FROM f4)) crowd;

预计算模式(宽表、窄表均适合)

对维度组合的人群固化为更优的数据结构bitmap

•适应场景:基数高,计算复杂度大,更新频率低场景

•优势:查询性能高

方案描述:

由于roaringbitmap需要整数类型作为ID参数,因此增加usermapping表做用户逻辑ID与底层物理ID的映射。

案例

-------------------- 用户标签维度表 ---------------------
BEGIN;
CREATE TABLE tag3.user_tags (
    "tag_key" text,
    "tag_value" text,
    "userlist" roaringbitmap,
    ds text
) partition by list(ds);
CALL SET_TABLE_PROPERTY('tag3.user_tags', 'orientation', 'column');
CALL SET_TABLE_PROPERTY('tag3.user_tags', 'bitmap_columns', 'tag_key,tag_value');
CALL SET_TABLE_PROPERTY('tag3.user_tags', 'dictionary_encoding_columns', 'tag_key:auto,tag_value:auto');
COMMIT;

begin;
create table tag3.usermapping
(
    userid_int serial,
    userid text
);
commit;

--构建RoaringBitmap--
INSERT INTO tag3. user_tags SELECT tag_key ,tag_value ,rb_build(array_agg(user_id::INT)) FROM tag2.user_tags GROUP BY tag_key ,tag_value 

--查询例子--
SELECT Rb_cardinality(Rb_and(Rb_or(t1.r, t2.r), Rb_andnot(t3.r, t4.r)))
FROM 
       (
              SELECT Rb_and_agg(userlist) AS r
              FROM   tag3.user_tags
              WHERE  ds = '20210101'
              AND    tag_key = 'tag_single'
              AND    tag_value = 'myname' ) AS t1,
       (
              SELECT rb_and_agg(userlist) AS r
              FROM   tag3.user_tags
              WHERE  ds = '20210101'
              AND    tag_key = 'tag_date'
              AND    tag_value > '20210101' ) AS t2,
       (
              SELECT rb_and_agg(userlist) AS r
              FROM   tag3.user_tags
              WHERE  ds = '20210101'
              AND    tag_key = 'tag_numeric'
              AND    to_number(tag_value, '99G999D9S') > 90 ) AS t3,
       (
              SELECT rb_and_agg(userlist) AS r
              FROM   tag3.user_tags
              WHERE  ds = '20210101'
              AND    tag_key = 'tag_multi'
              AND    tag_value IN ('HONOR',
                                   'MI') ) AS t4

用户画像与圈定的一些经验

•标签分为主画像和扩展画像多张表,区分高频访问和低频访问

•标签分为实时(Flink)更新和离线(MaxCompute)更新两部分,两部分共享一张表,减少运行时Join,Flink加工实时部分,MaxCompute加工离线部分,在Hologres中合并

•宽表模式简单,擅长定性分析

•窄表模式灵活,计算量大,擅长定量分析

•基于RoaringBitmap的预计算技术,用户体验最好,开发复杂度较高(比如bitmap分桶),SQL需要定制,适合DMP等有封装能力的平台,擅长UV

原文链接
本文为阿里云原创内容,未经允许不得转载。

以上是关于基于 MaxCompute + Hologres 的人群圈选和数据服务实践的主要内容,如果未能解决你的问题,请参考以下文章

Hologres揭秘:高性能原生加速MaxCompute核心原理

Hologres基于TPCH的性能测试介绍

Hologres如何基于roaringbitmap实现超高基数UV计算?

Hologres如何支持超高基数UV计算(基于roaringbitmap实现)

来电科技:基于Flink+Hologres的实时数仓演进之路

来电科技:基于 Flink + Hologres 的实时数仓演进之路