ClickHouse实践

Posted   

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ClickHouse实践相关的知识,希望对你有一定的参考价值。

1.OLAP详解

OLTP + OLAP: T:transaction 事务处理 侧重于增删改 A : analysis 分析 Select大批量数据的聚合查询事务处理作用:保证数据的一致性,如果涉及到事务操作,这个操作的执行效率必然不高

OLAP + OLTP =====> 同时满足,很难涉及

mysql: insert update delete Hive ClickHouse: Select 查询分析的高效

读模式 + 写模式 OLAP一般都是读模式, OLTP 写模式 ClickHouse一出来,界限模糊了。 ClickHouse 写模式+ OLAP

海量数据做查询分析高效: 列式数据库, 写模式(保证同一列的数据类型是一样的: 方便压缩),排序

OLAP体系的重要三个特点: 排序 + 写模式 + 列式数据库

ClickHouse 全部都具备!

1.1 OLAP的场景分析

1).读多于写

​ 不同于事务处理(OLTP)的场景,比如电商场景中加购物车、下单、支付等需要在原地进行大量insert、update、delete操作,数据分析(OLAP)场景通常是将数据批量导入后,进行任意维度的灵活探索、BI工具洞察、报表制作等。

​ 数据一次性写入后,分析师需要尝试从各个角度对数据做挖掘、分析,直到发现其中的商业价值、业务变化趋势等信息。这是一个需要反复试错、不断调整、持续优化的过程,其中数据的读取次数远多于写入次数。这就要求底层数据库为这个 特点做专门设计,而不是盲目采用传统数据库的技术架构。

2).大宽表,读大量行但是少量列,结果集较小

在OLAP场景中,通常存在一张或是几张多列的大宽表,列数高达数百甚至数千列。对数据分析处理时,选择其中的少数几列作为维度列、其他少数几列作为指标列,然后对全表或某一个较大范围内的数据做聚合计算。这个过程会扫描大量的行数据,但是只用到了其中的少数列。而聚合计算的结果集相比于动辄数十亿的原始数据,也明显小得多。

例如:查询公司每个部门人有多少。

select department, count(id) as total from compant group by department;

3).数据批量写入,且数据不更新或少更新

OLTP类业务对于延时(Latency)要求更高,要避免让客户等待造成业务损失;而OLAP类业务,由于数据量非常大,通常更加关注写入吞吐(Throughput),要求海量数据能够尽快导入完成。一旦导入完成,历史数据往往作为存档,不会再做更新、删除操作

4).无需事务,数据一致性要求低

OLAP类业务对于事务需求较少,通常是导入历史日志数据,或搭配一款事务型数据库并实时从事务型数据库中进行数据同步。多数OLAP系统都支持最终一致性。

5).灵活多变,不适合预先建模

分析场景下,随着业务变化要及时调整分析维度、挖掘方法,以尽快发现数据价值、更新业务指标。而数据仓库中通常存储着海量的历史数据,调整代价十分高昂。预先建模技术虽然可以在特定场景中加速计算,但是无法满足业务灵活多变的发展需求,维护成本过高。

2.ClickHouse

DataType

​ 数据的序列化和反序列化工作由DataType负责。IDataType接口定义了许多正反序列化的方法,它们成对出现,例如serializeBinary和deserializeBinary、serializeTextJSON和deserializeTextJSON等,涵盖了常用的二进制、文本、JSON、XML、CSV和Protobuf等多种格式类型。IDataType也使用了泛化的设计模式,具体方法的实现逻辑由对应数据类型的实例承载,例如DataTypeString、DataTypeArray及DataTypeTuple等。

​ DataType虽然负责序列化相关工作,但它并不直接负责数据的读取,而是转由从Column或Field对象获取。在DataType的实现类中,聚合了相应数据类型的Column对象和Field对象。例如,DataTypeString会引用字符串类型的ColumnString,而DataTypeArray则会引用数组类型的ColumnArray,以此类推。

Block与Block流

​ ClickHouse内部的数据操作是面向Block对象进行的,并且采用了流的形式。虽然Column和Filed组成了数据的基本映射单元,但对应到实际操作,它们还缺少了一些必要的信息,比如数据的类型及列的名称。于是ClickHouse设计了Block对象,Block对象可以看作数据表的子集。Block对象的本质是由数据对象、数据类型和列名称组成的三元组,即Column、DataType及列名称字符串。Column提供了数据的读取能力,而DataType知道如何正反序列化,所以Block在这些对象的基础之上实现了进一步的抽象和封装,从而简化了整个使用的过程,仅通过Block对象就能完成一系列的数据操作。在具体的实现过程中,Block并没有直接聚合Column和DataType对象,而是通过 ColumnWithTypeAndName对象进行间接引用。

​ 有了Block对象这一层封装之后,对Block流的设计就是水到渠成的事情了。流操作有两组顶层接口:IBlockInputStream负责数据的读取和关系运算,IBlockOutputStream负责将数据输出到下一环节。Block流也使用了泛化的设计模式,对数据的各种操作最终都会转换成其中一种流的实现。IBlockInputStream接口定义了读取数据的若干个read虚方法,而具体的实现逻辑则交由它的实现类来填充。

​ IBlockInputStream接口总共有60多个实现类,它们涵盖了ClickHouse数据摄取的方方面面。这些实现类大致可以分为三类:

​ 第 一类用于处理数据定义的DDL操作,例如DDLQueryStatusInputStream 等;

​ 第二类用于处理关系运算的相关操作,例如LimitBlockInputStream、JoinBlockInputStream及AggregatingBlockInputStream等;

​ 第三类则是与表引擎呼应,每一种表引擎都拥有与之对应的BlockInputStream实现,例如MergeTreeBaseSelectBlockInputStream(MergeTree表引擎)、TinyLogBlockInputStream(TinyLog表引擎)及KafkaBlockInputStream(Kafka表引擎)等。

​ IBlockOutputStream的设计与IBlockInputStream如出一辙。IBlockOutputStream接口同样也定义了若干写入数据的write虚方法。它的实现类比IBlockInputStream要少许多,一共只有20多种。这些实现类基本用于表引擎的相关处理,负责将数据写入下一环节或者最终目的地,例如MergeTreeBlockOutputStream、 TinyLogBlockOutputStream及StorageFileBlock-OutputStream等。

Table

​ 在数据表的底层设计中并没有所谓的Table对象,它直接使用 IStorage接口指代数据表。表引擎是ClickHouse的一个显著特性,不同的表引擎由不同的子类实现,例如IStorageSystemOneBlock(系统表)、StorageMergeTree(合并树表引擎)和StorageTinyLog(日志表引擎)等。IStorage接口定义了DDL(如ALTER、RENAME、OPTIMIZE和DROP等)、read和write方法,它们分别负责数据的定义、查询与写入。在数据查询时,IStorage负责根据AST查询语句的指示要求,返回指定列的原始数据。后续对数据的进一步加工、计算和过滤,则会统一交由Interpreter解释器对象处理。对Table发起的一次操作通常都会经历这样的过程,接收AST查询语句,根据AST返回指定列的数据,之后再将数据交由Interpreter做进一步处理。

Parser与Interpreter

​ Parser和Interpreter是非常重要的两组接口:Parser分析器负责创建AST对象;而Interpreter解释器则负责解释AST,并进一步创建查询的执行管道。它们与IStorage一起,串联起了整个数据查询的过

程。Parser分析器可以将一条SQL语句以递归下降的方法解析成AST语法树的形式。不同的SQL语句,会经由不同的Parser实现类解析。例如,有负责解析DDL查询语句的ParserRenameQuery、ParserDropQuery和ParserAlterQuery解析器,也有负责解析INSERT语句的 ParserInsertQuery解析器,还有负责SELECT语句的 ParserSelectQuery等。

​ Interpreter解释器的作用就像Service服务层一样,起到串联整个查询过程的作用,它会根据解释器的类型,聚合它所需要的资源。首先它会解析AST对象;然后执行“业务逻辑”(例如分支判断、设置参数、调用接口等);最终返回IBlock对象,以线程的形式建立起一个查询执行管道。

Functions与Aggregate Functions

​ ClickHouse主要提供两类函数——普通函数和聚合函数。普通函数由IFunction接口定义,拥有数十种函数实现,例如FunctionFormatDateTime、FunctionSubstring等。除了一些常见的函数(诸如四则运算、日期转换等)之外,也不乏一些非常实用的函数,例如网址提取函数、IP地址脱敏函数等。普通函数是没有状态的,函数效果作用于每行数据之上。当然,在函数具体执行的过程中,并不会一行一行地运算,而是采用向量化的方式直接作用于一整列数据。

​ 聚合函数由IAggregateFunction接口定义,相比无状态的普通函数,聚合函数是有状态的。以COUNT聚合函数为例,其AggregateFunctionCount的状态使用整型UInt64记录。聚合函数的状态支持序列化与反序列化,所以能够在分布式节点之间进行传输,以实现增量计算。

2.1苏宁选择ClickHouse的原因

1)速度快

2)特性发布快

3)软件质量高

4)物化视图

5)高基数查询

6)精准去重计数

2.2 ClickHouse使用场景

1)适用场景

  • web和app数据分析
  • 广告网络和RTB
  • 电信
  • 电子商务和金融
  • 信息安全
  • 监测
  • 时序数据
  • 商业智能
  • 在线游戏
  • 物联网

2)不适用场景

  • 事务性工作(OLTP)
  • 高并发的键值访问
  • 文档存储
  • 超标准化的数据

2.3 ClickHouse的优点

1)真正的面向列的DBMS(ClickHouse是一个DBMS,而不是一个单一的数据库。它允许在运行时创建表和数据库、加载数据和运行查询,而无需重新配置和重新启动服务器)
2)数据压缩(一些面向列的DBMS(INFINIDB CE 和 MonetDB)不使用数据压缩。但是,数据压缩确实是提高了性能)
3)磁盘存储的数据(许多面向列的DBMS(SPA HANA和GooglePowerDrill))只能在内存中工作。但即使在数千台服务器上,内存也太小了。)
4)多核并行处理(多核多节点并行化大型查询)
5)在多个服务器上分布式处理(在clickhouse中,数据可以驻留在不同的分片上。每个分片都可以用于容错的一组副本,查询会在所有分片上并行处理)
6)SQL支持(ClickHouse sql 跟真正的sql有不一样的函数名称。不过语法基本跟SQL语法兼容,支持JOIN/FROM/IN 和JOIN子句及标量子查询支持子查询)
7)向量化引擎(数据不仅按列式存储,而且由矢量-列的部分进行处理,这使得开发者能够实现高CPU性能)
8)实时数据更新(ClickHouse支持主键表。为了快速执行对主键范围的查询,数据使用合并树(MergeTree)进行递增排序。由于这个原因,数据可以不断地添加到表中)
9)支持近似计算(统计全国到底有多少人?143456754 14.3E)
10)数据复制和对数据完整性的支持(ClickHouse使用异步多主复制。写入任何可用的复本后,数据将分发到所有剩余的副本。系统在不同的副本上保持相同的数据。数据在失败后自动恢复)

2.4 ClickHouse的缺点

​ ClickHouse 作为一个被设计用来在实时分析的 OLAP 组件,只是在高效率的分析方面性能发挥到极致,那必然就会在其他方面做出取舍:

1)没有完整的事务支持,不支持Transaction想快就别Transaction
2)缺少完整Update/Delete操作,缺少高频率、低延迟的修改或删除已存在数据的能力,仅用于批量删除或修改数据。
3)聚合结果必须小于一台机器的内存大小
4)支持有限操作系统,正在慢慢完善
5)开源社区刚刚启动,主要是俄语为主,中文社区:http://www.clickhouse.com.cn
6)不适合Key-value存储,不支持Blob等文档型数据库

3.搜索字符串算法

​ 在字符串搜索方面,针对不同的场景, ClickHouse最终选择了这些算法:对于常量,使用Volnitsky算法;对

于非常量,使用CPU的向量化执行SIMD,暴力优化;正则匹配使用re2 和hyperscan算法。性能是算法选择的首要考量指标。

4.MergeTree

建表语句

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name ( 
    name1 [type] [DEFAULT|MATERIALIZED|ALIAS expr], 
    name2 [type] [DEFAULT|MATERIALIZED|ALIAS expr], 
    省略... ) 
   ENGINE = MergeTree() 
   [PARTITION BY expr]
   [ORDER BY expr] 
   [PRIMARY KEY expr] 
   [SAMPLE BY expr] 
   [SETTINGS name=value, 省略...]

(1)PARTITION BY [选填]

​ 分区键,用于指定表数3据以何种标准进行分区。分区键既可以是单个列字段,也可以通过元组的形式使用多个列字段,同时它也支持使用列表达式。如果不声明分区键,则ClickHouse会生成一个名为all的分区。合理使用数据分区,可以有效减少查询时数据文件的扫描范围,更多关于数据分区的细节会在6.2节介绍。

(2)ORDER BY [必填]

​ 排序键,用于指定在一个数据片段内, 数据以何种标准排序。默认情况下主键(PRIMARY KEY)与排序键相同。排序键既可以是单个列字段,例如ORDER BY CounterID,也可以通过元组的形式使用多个列字段,例如ORDERBY(CounterID,EventDate)。当使用多个列字段排序时,以ORDER BY(CounterID,EventDate)为例,在单个数据片段内,数据首先会以CounterID排序,相同CounterID的数据再按EventDate排序。

(3)PRIMARY KEY [选填]

​ 主键,顾名思义,声明后会依照主键字段生成一级索引,用于加速表查询。默认情况下,主键与排序键(ORDER BY)相同,所以通常直接使用ORDER BY代为指定主键,无须刻意通过PRIMARY KEY声明。所以在一般情况下,在单个数据片段内,数据与一级索引以相同的规则升序排列。与其他数据库不同,MergeTree 主键允许存在重复数据(ReplacingMergeTree可以去重)。

(4)SAMPLE BY [选填]

​ 抽样表达式,用于声明数据以何种标准进行采样。如果使用了此配置项,那么在主键的配置中也需要声明同样的表达式,例如:

 省略... 

) ENGINE = MergeTree() 

ORDER BY (CounterID, EventDate, intHash32(UserID) 

SAMPLE BY intHash32(UserID) 

抽样表达式需要配合SAMPLE子查询使用,这项功能对于选取抽样数据十分有用,更多关于抽样查询的使用方法会在第9章介绍。

(5)SETTINGS:index_granularity [选填]

​ index_granularity对于MergeTree而言是一项非常重要的参数,它表示索引的粒度,默认值为8192。也就是说,MergeTree的索引在默认情况下,每间隔8192行数据才生成一条索引,其具体声明方式如下所示:

省略... 

) ENGINE = MergeTree() 

省略... 

SETTINGS index_granularity = 8192; 

8192是一个神奇的数字,在ClickHouse中大量数值参数都有它的影子,可以被其整除(例如最小压缩块大小

min_compress_block_size:65536)。通常情况下并不需要修改此参数,但理解它的工作原理有助于我们更好地使用MergeTree。关于索引详细的工作原理会在后续阐述。

(6)SETTINGS:index_granularity_bytes [选填]

​ 在19.11版本之前,ClickHouse只支持固定大小的索引间隔,由index_granularity控制,默认为8192。在新版本中,它增加了自适应间隔大小的特性,即根据每一批次写入数据的体量大小,动态划分间隔大小。而数据的体量大小,正是由index_granularity_bytes参数控制的,默认为10M(10×1024×1024),设置为0表示不启动自适应功能。

(7)SETTINGS:enable_mixed_granularity_parts [选填]

​ 设置是否开启自适应索引间隔的功能,默认开启。

(8)SETTINGS:merge_with_ttl_timeout [选填]

​ 从19.6版本开始,MergeTree提供了数据TTL的功能,关于这部分的详细介绍,将

留到第7章介绍。

(9)SETTINGS:storage_policy [选填]:

​ 从19.15版本开始,MergeTree提供了多路径的存储策略,关于这部分的详细介绍,同样留

到第7章介绍。

5.BitMap

5.1为什么用Bitmap

1)存储成本

假设有个1,2,5的数字集合,如果常规的存储方法,要用3个Int32空间。其中一个Int32就是32位的空间。三个就是3*32Bit,相当于12个字节。

如果用Bitmap怎么存储呢,只用8Bit(1个字节)就够了。每一位代表一个数,位号就是数值,1标识有,0标识无。如下图:

76543210
00100110

这样的一个字节可以存8个整数,每一个数的存储成本实质上是1Bit。

也就是说Bitmap的存储成本是Array[Int32]的1/32,是Array[Int64]的1/64。

好处一: 如果有一个超大的无序且不重复的整数集合,用Bitmap的存储成本是非常低的。

2)天然去重

好处二: 因为每个值都只对应唯一的一个位置,不能存储两个值,所以Bitmap结构可以天然去重。

3)快速定位

如果我有一个需求,比如想判断数字“3”是否存在于该集合中。若是传统的数字集合存储,那就要逐个遍历每个元素进行判断,时间复杂度为O(N)。

但是若是Bitmap存储只要查看对应的下标数的值是0还是1即可,时间复杂度为O(1)。

查询3

7654→3210
00100110

好处三:非常方便快速的查询某个元素是否在集合中。

4)集合间计算

如果我有另一个集合2、3、7,我想查询这两个集合的交集。

传统方式[1,2,5]与[2,3,7] 取交集就要两层循环遍历。

而Bitmap只要把00100110和10001100进行与操作就行了。而计算机做与、或、非、异或 等等操作是非常快的。

76543210
10001100

&

76543210
00100110

=

76543210
00000100

好处四:集合与集合之间的运行非常快。

5)优势场景

综上,Bitmap非常适合的场景:

  1. 海量数据的压缩存储
  2. 去重存储
  3. 判断值存在于集合
  4. 集合之间的交并差

6)局限性

当然这种方式也有局限性:

  1. 只能存储数字而不是字符串
  2. 存储的值必须是无序不重复

无序:bitmap中保存的元素,不知道哪个先来哪个后到

不重复:天然去重,重复数据不能保存

  1. 不适合存储稀疏的集合,比如一个集合存了三个数[5,1230000,88880000] 这三个数,用Bitmap存储的话其实就不太划算。(但是clickhouse使用的RoaringBitmap,优化了这个稀疏问题。)

ch、redis都有bitmap,ch又对bitmap进行了优化,主要解决了稀疏问题

  • RoaringBitmap:把所有数据分段存储
  • Bitmap结构
    稠密
  • 普通的数据结构
    稀疏
  • 用哪种结构取决于数据的稠密程度

5.2Bitmap在用户分群中的应用

1)现状

首先,如下是用户的标签宽表,

用户性别年龄偏好
190后数码
270后书籍
390后美食
480后书籍
590后美食

如果想根据标签划分人群,比如:90后 + 偏好美食。

2)传统解决方案

那么无非对列值进行遍历筛选,如果优化也就是列上建立索引,但是当这张表有1000个标签列时,如果要索引生效并不是每列有索引就行,要每种查询组合建一个索引才能生效,索引数量相当于1000个列排列组合的个数,这显然是不可能的。

3)更好的方案

那么更好的办法是按字段重组成Bitmap

年龄Bitmap
90后1,3,5
80后4
70后2
性别Bitmap
1,2,3
4,5
偏好Bitmap
数码1
美食3,5
书籍2,4

如果能把数据调整成这样的结构,想进行条件组合,那就简单了.

比如: [美食] + [90后] = Bitmap[3,5] & Bitmap[1,3,5] = 3,5 这个计算速度相比宽表条件筛选是非常非常快的。

5.3 在clickhouse中使用Bitmap表

还是以上面的表举例:

1)建表和数据

create table user_tag_merge 
(   
    uid UInt64,
	gender String,
	agegroup String,
	favor String
)engine=MergeTree()
order by (uid);

模拟数据

insert into user_tag_merge values(1,'M','90后','sm');
insert into user_tag_merge values(2,'M','70后','sj');
insert into user_tag_merge values(3,'M','90后','ms');
insert into user_tag_merge values(4,'F','80后','sj');
insert into user_tag_merge values(5,'F','90后','ms');

Bitmap表

create table user_tag_value_string
 ( 
    tag_code String,
	tag_value String ,
	us AggregateFunction(groupBitmap,UInt64)
)engine=AggregatingMergeTree()
 partition by  (tag_code) 
 order by (tag_value);

Bitmap表必须选择AggregatingMergeTree引擎。

对应的Bitmap字段,必须是AggregateFunction(groupBitmap,UInt64)

  • groupBitmap标识数据的聚合方式
  • UInt64标识最大可存储的数字长度

业务结构上,稍作了调整。把不同的标签放在了同一张表中,但是因为根据tag_code进行了分区,所以不同的标签实质上还是物理分开的。

标签(tag_code)标签值Bitmap
年龄段90后1,3,5
年龄段80后4
年龄段70后2
性别1,2,3
性别4,5
偏好数码1
偏好美食3,5
偏好书籍2,4

2)处理步骤

(1)首先第一步要把已有的宽表数据保存进

select agegroup , gender  , favor  ,uid from user_tag_merge

(2)每个值前面,补上字段名,用()组合成元组

select  ('agegroup', agegroup ) ,
   ('gender',gender ) , ('favor',favor )    ,uid 
from user_tag_merge

(3)每个列用[]拼接成数组

select  [ ('agegroup', agegroup  ) ,
     ('gender',gender ) , 
	 ('favor',favor )]  tag_code_value ,uid 
from user_tag_merge

(4)用arrayJoin炸开,类似于hive中的explode

SELECT
    arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
    uid
FROM user_tag_merge

(5) 把元组中的字段名和字段值拆开,并用这两个作为维度聚合uid

SELECT
    tag_code_value.1 AS tag_code,
    tag_code_value.2 AS tag_value,
    groupArray(uid) AS us
FROM 
(
    SELECT
        arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
        uid
    FROM user_tag_merge
) AS tv
GROUP BY
    tag_code_value.1,
    tag_code_value.2

(6) 这已经和我们要求的结果非常接近了。只不过关于用户id的聚合,groupArray实现的是拼接成用户id的数组,而我们想要的聚合是,聚合成一个Bitmap。

那只要改一下聚合函数就可以了。

把groupArray 替换成 groupBitmapState

SELECT
    tag_code_value.1 AS tag_code,
    tag_code_value.2 AS tag_value,
    groupBitmapState (uid) AS us
FROM 
(
    SELECT
        arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
        uid
    FROM user_tag_merge
) AS tv
GROUP BY
    tag_code_value.1,
    tag_code_value.2

bitmapToArray 把bitmap转成数组 bitmapToArray(groupBitmapState (uid))

这里聚合成bitmap的列没有显示是正常的,因为bitmap的结构本身无法用正常文本显示。

(7)接下来我们可以插入到bitmap表中。

insert into user_tag_value_string
select tag_code_value.1 as tag_code,tag_code_value.2 as tag_value ,
groupBitmapState( uid ) us
from (
SELECT
    arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
     uid 
FROM user_tag_merge
)tv 
 group by tag_code_value.1,tag_code_value.2

插入完成后,查询依旧不会正常显示

5.4 对Bitmap进行查询

1)条件组合查询

比如我想查询[90后]+[美食]的用户

select bitmapToArray( bitmapAnd( ( select us from user_tag_value_string where tag_value=‘ms’ and tag_code=‘favor’ ) , ( select us from user_tag_value_string where tag_value=‘90后’ and tag_code=‘agegroup’ ) ) )as res

这里首先用条件筛选出us, 每个代表一个Bitmap结构的uid集合,找到两个Bitmap后用bitmapAnd函数求交集。 然后为了观察结果用bitmapToArray函数转换成可见的数组。

2)范围值查询

比如要取

[90后]或者[80后] + [美食]

或者,消费金额大于1000 + [女性]

select  bitmapToArray(  
    bitmapAnd(
        ( select   groupBitmapMergeState(us) us from user_tag_value_string where tag_value='ms' and tag_code='favor'  ),
        ( select   groupBitmapMergeState(us) from user_tag_value_string where tag_value in ('90后','80后') and tag_code='agegroup'  ) )
)as res

因为查询时,有可能需要针对某一个标签,取多个值,甚至是一个区间范围,那就会涉及多个值的userId集合,因此需要在子查询内部用 groupBitmapMergeState 进行一次合并,其实就多个集合取并集。

3)函数总结

函数
arrayJoin宽表转Bitmap表需要行转列,要用arrayJoin把多列数组炸成行。
groupBitmapState一种把聚合列的数字值聚合成Bitmap的聚合函数,普通值转成bitmap
bitmapAnd求两个Bitmap值的交集(与运算)
bitmapOr求两个Bitmap值的并集(或运算)
bitmapXor求两个Bitmap值的差集(异或运算)
bitmapToArray把Bitmap转换成数值数组
groupBitmapMergeState把一列中多个bitmap值进行并集聚合。
bitmapCardinality

为什么要迁移及使用

标签计算完成后保存在hive虽然可以查询但是性能非常糟糕。而标签的使用往往是即时的。最常见的场景就是“用户分群”,也称“人群圈选”、“圈人”等等。

分群操作就是根据多个标签组合,产生一个用户集合,供营销、广告等部门使用。而这些操作计算量大,产生结果需要时效性高。

方案选型

选择方案最重要的依据就是数据量和时效性要求。

时效性数据量分群方案
能接受隔天无所谓HIVE宽表
即时产生千万以下OLAP宽表(Elasticsearh,Tidb,Clickhouse…)
即时产生亿级Bitmap方式(Clickhouse,Doris)

从Hive导入ClickHouse

工具类

object MyClickHouseUtil 
    private val properties=new Properties();
    properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.
      getResourceAsStream(""config.properties"") , "UTF-8"))
    val CLICKHOUSE_URL = properties.getProperty("clickhouse.url")
    def executeSql(sql: String ): Unit =
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        val connection: Connection = DriverManager.getConnection(CLICKHOUSE_URL, null, null)
        val  statement: Statement = connection.createStatement()
        statement.execute(sql)
        connection.close()
    

先通过把数据查询成为Dataframe ,再通过行动算子写入至Clickhouse的宽表。

object TaskTransformChApp 
    def main(args: Array[String]): Unit = 
        // 读取宽表数据
        val sparkConf: SparkConf = new SparkConf().setAppName("HiveToCH")//.setMaster("local[*]")
        val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
        val properties=new Properties();
    	properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.
      getResourceAsStream(""config.properties"") , "UTF-8"))
        //用户画像的数据库名 hive
        val userProfileDbName = properties.getProperty("user-profile.dbname")
        //数仓数据库名 hive
        val wareHouseDbName = properties.getProperty("data-warehouse.dbname")
        //hdfs地址
        val hdfsStorePath = properties.getProperty("hdfs-store.path")
        //jdbc:clickhouse://***:***/user_profile
        val clickhouseUrl = properties.getProperty("clickhouse.url")
        // 得到宽表的列数据
        val tableName="up_tag_merge_" + taskDate.replace("-","")
        //查询
        val tableDf: DataFrame = sparkSession.sql("select * from user_profile."+tableName).toDF()
        val columns: Array[String] = tableDf.columns
        // 形成RDD
        val dropTableSql= s"drop table IF   EXISTS  $userProfileDbName.$tableName"
        MyClickHouseUtil.executeSql(dropTableSql,clickhouseUrl)
        
        //建立clickhouse表
        //create table user_info
        //(
        //    uid String,
        // gender String,
        // city String,
        // date String
        //)engine=MergeTree()
        //order by (uid);
        val columnsSql: String =   columns.map(column => column.toLowerCase+" String").mkString(",")
        val createTableSql= "create table  user_profile."+tableName + "("+columnsSql+") engine =MergeTree() order by uid"
        MyClickHouseUtil.executeSql(createTableSql,clickhouseUrl)
        // 利用jdbc算子写入clickhouse
        tableDf.write.mode(SaveMode.Append)
        .option("batchsize", "100") // 尽量以批次的形式写入clickhouse
        .option("isolationLevel", "NONE") // 关闭事务
        .option("numPartitions", "4") // 设置并发
        .option("driver","ru.yandex.clickhouse.ClickHouseDriver")
        .jdbc(clickhouseUrl,tableName,new Properties())
    

以上是关于ClickHouse实践的主要内容,如果未能解决你的问题,请参考以下文章

clickhouse基于ClickHouse的海量数据交互式OLAP分析场景实践

微信 ClickHouse 实时数仓的最佳实践

极客星球|Clickhouse在数据智能公司的应用与实践

clickhouse实践关于clickhouse对空值的处理总结

ClickHouse 在网易的实践

周路:为什么是ClickHouse?eBay广告数据平台架构实践!