ClickHouse实践
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ClickHouse实践相关的知识,希望对你有一定的参考价值。
1.OLAP详解
OLTP + OLAP: T:transaction 事务处理 侧重于增删改 A : analysis 分析 Select大批量数据的聚合查询事务处理作用:保证数据的一致性,如果涉及到事务操作,这个操作的执行效率必然不高
OLAP + OLTP =====> 同时满足,很难涉及
mysql: insert update delete Hive ClickHouse: Select 查询分析的高效
读模式 + 写模式 OLAP一般都是读模式, OLTP 写模式 ClickHouse一出来,界限模糊了。 ClickHouse 写模式+ OLAP
海量数据做查询分析高效: 列式数据库, 写模式(保证同一列的数据类型是一样的: 方便压缩),排序
OLAP体系的重要三个特点: 排序 + 写模式 + 列式数据库
ClickHouse 全部都具备!
1.1 OLAP的场景分析
1).读多于写
不同于事务处理(OLTP)的场景,比如电商场景中加购物车、下单、支付等需要在原地进行大量insert、update、delete操作,数据分析(OLAP)场景通常是将数据批量导入后,进行任意维度的灵活探索、BI工具洞察、报表制作等。
数据一次性写入后,分析师需要尝试从各个角度对数据做挖掘、分析,直到发现其中的商业价值、业务变化趋势等信息。这是一个需要反复试错、不断调整、持续优化的过程,其中数据的读取次数远多于写入次数。这就要求底层数据库为这个 特点做专门设计,而不是盲目采用传统数据库的技术架构。
2).大宽表,读大量行但是少量列,结果集较小
在OLAP场景中,通常存在一张或是几张多列的大宽表,列数高达数百甚至数千列。对数据分析处理时,选择其中的少数几列作为维度列、其他少数几列作为指标列,然后对全表或某一个较大范围内的数据做聚合计算。这个过程会扫描大量的行数据,但是只用到了其中的少数列。而聚合计算的结果集相比于动辄数十亿的原始数据,也明显小得多。
例如:查询公司每个部门人有多少。
select department, count(id) as total from compant group by department;
3).数据批量写入,且数据不更新或少更新
OLTP类业务对于延时(Latency)要求更高,要避免让客户等待造成业务损失;而OLAP类业务,由于数据量非常大,通常更加关注写入吞吐(Throughput),要求海量数据能够尽快导入完成。一旦导入完成,历史数据往往作为存档,不会再做更新、删除操作
4).无需事务,数据一致性要求低
OLAP类业务对于事务需求较少,通常是导入历史日志数据,或搭配一款事务型数据库并实时从事务型数据库中进行数据同步。多数OLAP系统都支持最终一致性。
5).灵活多变,不适合预先建模
分析场景下,随着业务变化要及时调整分析维度、挖掘方法,以尽快发现数据价值、更新业务指标。而数据仓库中通常存储着海量的历史数据,调整代价十分高昂。预先建模技术虽然可以在特定场景中加速计算,但是无法满足业务灵活多变的发展需求,维护成本过高。
2.ClickHouse
DataType
数据的序列化和反序列化工作由DataType负责。IDataType接口定义了许多正反序列化的方法,它们成对出现,例如serializeBinary和deserializeBinary、serializeTextJSON和deserializeTextJSON等,涵盖了常用的二进制、文本、JSON、XML、CSV和Protobuf等多种格式类型。IDataType也使用了泛化的设计模式,具体方法的实现逻辑由对应数据类型的实例承载,例如DataTypeString、DataTypeArray及DataTypeTuple等。
DataType虽然负责序列化相关工作,但它并不直接负责数据的读取,而是转由从Column或Field对象获取。在DataType的实现类中,聚合了相应数据类型的Column对象和Field对象。例如,DataTypeString会引用字符串类型的ColumnString,而DataTypeArray则会引用数组类型的ColumnArray,以此类推。
Block与Block流
ClickHouse内部的数据操作是面向Block对象进行的,并且采用了流的形式。虽然Column和Filed组成了数据的基本映射单元,但对应到实际操作,它们还缺少了一些必要的信息,比如数据的类型及列的名称。于是ClickHouse设计了Block对象,Block对象可以看作数据表的子集。Block对象的本质是由数据对象、数据类型和列名称组成的三元组,即Column、DataType及列名称字符串。Column提供了数据的读取能力,而DataType知道如何正反序列化,所以Block在这些对象的基础之上实现了进一步的抽象和封装,从而简化了整个使用的过程,仅通过Block对象就能完成一系列的数据操作。在具体的实现过程中,Block并没有直接聚合Column和DataType对象,而是通过 ColumnWithTypeAndName对象进行间接引用。
有了Block对象这一层封装之后,对Block流的设计就是水到渠成的事情了。流操作有两组顶层接口:IBlockInputStream负责数据的读取和关系运算,IBlockOutputStream负责将数据输出到下一环节。Block流也使用了泛化的设计模式,对数据的各种操作最终都会转换成其中一种流的实现。IBlockInputStream接口定义了读取数据的若干个read虚方法,而具体的实现逻辑则交由它的实现类来填充。
IBlockInputStream接口总共有60多个实现类,它们涵盖了ClickHouse数据摄取的方方面面。这些实现类大致可以分为三类:
第 一类用于处理数据定义的DDL操作,例如DDLQueryStatusInputStream 等;
第二类用于处理关系运算的相关操作,例如LimitBlockInputStream、JoinBlockInputStream及AggregatingBlockInputStream等;
第三类则是与表引擎呼应,每一种表引擎都拥有与之对应的BlockInputStream实现,例如MergeTreeBaseSelectBlockInputStream(MergeTree表引擎)、TinyLogBlockInputStream(TinyLog表引擎)及KafkaBlockInputStream(Kafka表引擎)等。
IBlockOutputStream的设计与IBlockInputStream如出一辙。IBlockOutputStream接口同样也定义了若干写入数据的write虚方法。它的实现类比IBlockInputStream要少许多,一共只有20多种。这些实现类基本用于表引擎的相关处理,负责将数据写入下一环节或者最终目的地,例如MergeTreeBlockOutputStream、 TinyLogBlockOutputStream及StorageFileBlock-OutputStream等。
Table
在数据表的底层设计中并没有所谓的Table对象,它直接使用 IStorage接口指代数据表。表引擎是ClickHouse的一个显著特性,不同的表引擎由不同的子类实现,例如IStorageSystemOneBlock(系统表)、StorageMergeTree(合并树表引擎)和StorageTinyLog(日志表引擎)等。IStorage接口定义了DDL(如ALTER、RENAME、OPTIMIZE和DROP等)、read和write方法,它们分别负责数据的定义、查询与写入。在数据查询时,IStorage负责根据AST查询语句的指示要求,返回指定列的原始数据。后续对数据的进一步加工、计算和过滤,则会统一交由Interpreter解释器对象处理。对Table发起的一次操作通常都会经历这样的过程,接收AST查询语句,根据AST返回指定列的数据,之后再将数据交由Interpreter做进一步处理。
Parser与Interpreter
Parser和Interpreter是非常重要的两组接口:Parser分析器负责创建AST对象;而Interpreter解释器则负责解释AST,并进一步创建查询的执行管道。它们与IStorage一起,串联起了整个数据查询的过
程。Parser分析器可以将一条SQL语句以递归下降的方法解析成AST语法树的形式。不同的SQL语句,会经由不同的Parser实现类解析。例如,有负责解析DDL查询语句的ParserRenameQuery、ParserDropQuery和ParserAlterQuery解析器,也有负责解析INSERT语句的 ParserInsertQuery解析器,还有负责SELECT语句的 ParserSelectQuery等。
Interpreter解释器的作用就像Service服务层一样,起到串联整个查询过程的作用,它会根据解释器的类型,聚合它所需要的资源。首先它会解析AST对象;然后执行“业务逻辑”(例如分支判断、设置参数、调用接口等);最终返回IBlock对象,以线程的形式建立起一个查询执行管道。
Functions与Aggregate Functions
ClickHouse主要提供两类函数——普通函数和聚合函数。普通函数由IFunction接口定义,拥有数十种函数实现,例如FunctionFormatDateTime、FunctionSubstring等。除了一些常见的函数(诸如四则运算、日期转换等)之外,也不乏一些非常实用的函数,例如网址提取函数、IP地址脱敏函数等。普通函数是没有状态的,函数效果作用于每行数据之上。当然,在函数具体执行的过程中,并不会一行一行地运算,而是采用向量化的方式直接作用于一整列数据。
聚合函数由IAggregateFunction接口定义,相比无状态的普通函数,聚合函数是有状态的。以COUNT聚合函数为例,其AggregateFunctionCount的状态使用整型UInt64记录。聚合函数的状态支持序列化与反序列化,所以能够在分布式节点之间进行传输,以实现增量计算。
2.1苏宁选择ClickHouse的原因
1)速度快
2)特性发布快
3)软件质量高
4)物化视图
5)高基数查询
6)精准去重计数
2.2 ClickHouse使用场景
1)适用场景
- web和app数据分析
- 广告网络和RTB
- 电信
- 电子商务和金融
- 信息安全
- 监测
- 时序数据
- 商业智能
- 在线游戏
- 物联网
2)不适用场景
- 事务性工作(OLTP)
- 高并发的键值访问
- 文档存储
- 超标准化的数据
2.3 ClickHouse的优点
1)真正的面向列的DBMS(ClickHouse是一个DBMS,而不是一个单一的数据库。它允许在运行时创建表和数据库、加载数据和运行查询,而无需重新配置和重新启动服务器)
2)数据压缩(一些面向列的DBMS(INFINIDB CE 和 MonetDB)不使用数据压缩。但是,数据压缩确实是提高了性能)
3)磁盘存储的数据(许多面向列的DBMS(SPA HANA和GooglePowerDrill))只能在内存中工作。但即使在数千台服务器上,内存也太小了。)
4)多核并行处理(多核多节点并行化大型查询)
5)在多个服务器上分布式处理(在clickhouse中,数据可以驻留在不同的分片上。每个分片都可以用于容错的一组副本,查询会在所有分片上并行处理)
6)SQL支持(ClickHouse sql 跟真正的sql有不一样的函数名称。不过语法基本跟SQL语法兼容,支持JOIN/FROM/IN 和JOIN子句及标量子查询支持子查询)
7)向量化引擎(数据不仅按列式存储,而且由矢量-列的部分进行处理,这使得开发者能够实现高CPU性能)
8)实时数据更新(ClickHouse支持主键表。为了快速执行对主键范围的查询,数据使用合并树(MergeTree)进行递增排序。由于这个原因,数据可以不断地添加到表中)
9)支持近似计算(统计全国到底有多少人?143456754 14.3E)
10)数据复制和对数据完整性的支持(ClickHouse使用异步多主复制。写入任何可用的复本后,数据将分发到所有剩余的副本。系统在不同的副本上保持相同的数据。数据在失败后自动恢复)
2.4 ClickHouse的缺点
ClickHouse 作为一个被设计用来在实时分析的 OLAP 组件,只是在高效率的分析方面性能发挥到极致,那必然就会在其他方面做出取舍:
1)没有完整的事务支持,不支持Transaction想快就别Transaction
2)缺少完整Update/Delete操作,缺少高频率、低延迟的修改或删除已存在数据的能力,仅用于批量删除或修改数据。
3)聚合结果必须小于一台机器的内存大小
4)支持有限操作系统,正在慢慢完善
5)开源社区刚刚启动,主要是俄语为主,中文社区:http://www.clickhouse.com.cn
6)不适合Key-value存储,不支持Blob等文档型数据库
3.搜索字符串算法
在字符串搜索方面,针对不同的场景, ClickHouse最终选择了这些算法:对于常量,使用Volnitsky算法;对
于非常量,使用CPU的向量化执行SIMD,暴力优化;正则匹配使用re2 和hyperscan算法。性能是算法选择的首要考量指标。
4.MergeTree
建表语句
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name (
name1 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
name2 [type] [DEFAULT|MATERIALIZED|ALIAS expr],
省略... )
ENGINE = MergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, 省略...]
(1)PARTITION BY [选填]:
分区键,用于指定表数3据以何种标准进行分区。分区键既可以是单个列字段,也可以通过元组的形式使用多个列字段,同时它也支持使用列表达式。如果不声明分区键,则ClickHouse会生成一个名为all的分区。合理使用数据分区,可以有效减少查询时数据文件的扫描范围,更多关于数据分区的细节会在6.2节介绍。
(2)ORDER BY [必填]:
排序键,用于指定在一个数据片段内, 数据以何种标准排序。默认情况下主键(PRIMARY KEY)与排序键相同。排序键既可以是单个列字段,例如ORDER BY CounterID,也可以通过元组的形式使用多个列字段,例如ORDERBY(CounterID,EventDate)。当使用多个列字段排序时,以ORDER BY(CounterID,EventDate)为例,在单个数据片段内,数据首先会以CounterID排序,相同CounterID的数据再按EventDate排序。
(3)PRIMARY KEY [选填]:
主键,顾名思义,声明后会依照主键字段生成一级索引,用于加速表查询。默认情况下,主键与排序键(ORDER BY)相同,所以通常直接使用ORDER BY代为指定主键,无须刻意通过PRIMARY KEY声明。所以在一般情况下,在单个数据片段内,数据与一级索引以相同的规则升序排列。与其他数据库不同,MergeTree 主键允许存在重复数据(ReplacingMergeTree可以去重)。
(4)SAMPLE BY [选填]:
抽样表达式,用于声明数据以何种标准进行采样。如果使用了此配置项,那么在主键的配置中也需要声明同样的表达式,例如:
省略...
) ENGINE = MergeTree()
ORDER BY (CounterID, EventDate, intHash32(UserID)
SAMPLE BY intHash32(UserID)
抽样表达式需要配合SAMPLE子查询使用,这项功能对于选取抽样数据十分有用,更多关于抽样查询的使用方法会在第9章介绍。
(5)SETTINGS:index_granularity [选填]:
index_granularity对于MergeTree而言是一项非常重要的参数,它表示索引的粒度,默认值为8192。也就是说,MergeTree的索引在默认情况下,每间隔8192行数据才生成一条索引,其具体声明方式如下所示:
省略...
) ENGINE = MergeTree()
省略...
SETTINGS index_granularity = 8192;
8192是一个神奇的数字,在ClickHouse中大量数值参数都有它的影子,可以被其整除(例如最小压缩块大小
min_compress_block_size:65536)。通常情况下并不需要修改此参数,但理解它的工作原理有助于我们更好地使用MergeTree。关于索引详细的工作原理会在后续阐述。
(6)SETTINGS:index_granularity_bytes [选填]:
在19.11版本之前,ClickHouse只支持固定大小的索引间隔,由index_granularity控制,默认为8192。在新版本中,它增加了自适应间隔大小的特性,即根据每一批次写入数据的体量大小,动态划分间隔大小。而数据的体量大小,正是由index_granularity_bytes参数控制的,默认为10M(10×1024×1024),设置为0表示不启动自适应功能。
(7)SETTINGS:enable_mixed_granularity_parts [选填]:
设置是否开启自适应索引间隔的功能,默认开启。
(8)SETTINGS:merge_with_ttl_timeout [选填]:
从19.6版本开始,MergeTree提供了数据TTL的功能,关于这部分的详细介绍,将
留到第7章介绍。
(9)SETTINGS:storage_policy [选填]:
从19.15版本开始,MergeTree提供了多路径的存储策略,关于这部分的详细介绍,同样留
到第7章介绍。
5.BitMap
5.1为什么用Bitmap
1)存储成本
假设有个1,2,5的数字集合,如果常规的存储方法,要用3个Int32空间。其中一个Int32就是32位的空间。三个就是3*32Bit,相当于12个字节。
如果用Bitmap怎么存储呢,只用8Bit(1个字节)就够了。每一位代表一个数,位号就是数值,1标识有,0标识无。如下图:
7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|
0 | 0 | 1 | 0 | 0 | 1 | 1 | 0 |
这样的一个字节可以存8个整数,每一个数的存储成本实质上是1Bit。
也就是说Bitmap的存储成本是Array[Int32]的1/32,是Array[Int64]的1/64。
好处一: 如果有一个超大的无序且不重复的整数集合,用Bitmap的存储成本是非常低的。
2)天然去重
好处二: 因为每个值都只对应唯一的一个位置,不能存储两个值,所以Bitmap结构可以天然去重。
3)快速定位
如果我有一个需求,比如想判断数字“3”是否存在于该集合中。若是传统的数字集合存储,那就要逐个遍历每个元素进行判断,时间复杂度为O(N)。
但是若是Bitmap存储只要查看对应的下标数的值是0还是1即可,时间复杂度为O(1)。
查询3
7 | 6 | 5 | 4 | →3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|
0 | 0 | 1 | 0 | 0 | 1 | 1 | 0 |
好处三:非常方便快速的查询某个元素是否在集合中。
4)集合间计算
如果我有另一个集合2、3、7,我想查询这两个集合的交集。
传统方式[1,2,5]与[2,3,7] 取交集就要两层循环遍历。
而Bitmap只要把00100110和10001100进行与操作就行了。而计算机做与、或、非、异或 等等操作是非常快的。
7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|
1 | 0 | 0 | 0 | 1 | 1 | 0 | 0 |
&
7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|
0 | 0 | 1 | 0 | 0 | 1 | 1 | 0 |
=
7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|
0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
好处四:集合与集合之间的运行非常快。
5)优势场景
综上,Bitmap非常适合的场景:
- 海量数据的压缩存储
- 去重存储
- 判断值存在于集合
- 集合之间的交并差
6)局限性
当然这种方式也有局限性:
- 只能存储数字而不是字符串
- 存储的值必须是无序不重复
无序:bitmap中保存的元素,不知道哪个先来哪个后到
不重复:天然去重,重复数据不能保存
- 不适合存储稀疏的集合,比如一个集合存了三个数[5,1230000,88880000] 这三个数,用Bitmap存储的话其实就不太划算。(但是clickhouse使用的RoaringBitmap,优化了这个稀疏问题。)
ch、redis都有bitmap,ch又对bitmap进行了优化,主要解决了稀疏问题
- RoaringBitmap:把所有数据分段存储
- Bitmap结构
稠密 - 普通的数据结构
稀疏 - 用哪种结构取决于数据的稠密程度
5.2Bitmap在用户分群中的应用
1)现状
首先,如下是用户的标签宽表,
用户 | 性别 | 年龄 | 偏好 |
---|---|---|---|
1 | 男 | 90后 | 数码 |
2 | 男 | 70后 | 书籍 |
3 | 男 | 90后 | 美食 |
4 | 女 | 80后 | 书籍 |
5 | 女 | 90后 | 美食 |
如果想根据标签划分人群,比如:90后 + 偏好美食。
2)传统解决方案
那么无非对列值进行遍历筛选,如果优化也就是列上建立索引,但是当这张表有1000个标签列时,如果要索引生效并不是每列有索引就行,要每种查询组合建一个索引才能生效,索引数量相当于1000个列排列组合的个数,这显然是不可能的。
3)更好的方案
那么更好的办法是按字段重组成Bitmap
年龄 | Bitmap |
---|---|
90后 | 1,3,5 |
80后 | 4 |
70后 | 2 |
性别 | Bitmap |
---|---|
男 | 1,2,3 |
女 | 4,5 |
偏好 | Bitmap |
---|---|
数码 | 1 |
美食 | 3,5 |
书籍 | 2,4 |
如果能把数据调整成这样的结构,想进行条件组合,那就简单了.
比如: [美食] + [90后] = Bitmap[3,5] & Bitmap[1,3,5] = 3,5 这个计算速度相比宽表条件筛选是非常非常快的。
5.3 在clickhouse中使用Bitmap表
还是以上面的表举例:
1)建表和数据
create table user_tag_merge
(
uid UInt64,
gender String,
agegroup String,
favor String
)engine=MergeTree()
order by (uid);
模拟数据
insert into user_tag_merge values(1,'M','90后','sm');
insert into user_tag_merge values(2,'M','70后','sj');
insert into user_tag_merge values(3,'M','90后','ms');
insert into user_tag_merge values(4,'F','80后','sj');
insert into user_tag_merge values(5,'F','90后','ms');
Bitmap表
create table user_tag_value_string
(
tag_code String,
tag_value String ,
us AggregateFunction(groupBitmap,UInt64)
)engine=AggregatingMergeTree()
partition by (tag_code)
order by (tag_value);
Bitmap表必须选择AggregatingMergeTree引擎。
对应的Bitmap字段,必须是AggregateFunction(groupBitmap,UInt64)
- groupBitmap标识数据的聚合方式
- UInt64标识最大可存储的数字长度
业务结构上,稍作了调整。把不同的标签放在了同一张表中,但是因为根据tag_code进行了分区,所以不同的标签实质上还是物理分开的。
标签(tag_code) | 标签值 | Bitmap |
---|---|---|
年龄段 | 90后 | 1,3,5 |
年龄段 | 80后 | 4 |
年龄段 | 70后 | 2 |
性别 | 男 | 1,2,3 |
性别 | 女 | 4,5 |
偏好 | 数码 | 1 |
偏好 | 美食 | 3,5 |
偏好 | 书籍 | 2,4 |
2)处理步骤
(1)首先第一步要把已有的宽表数据保存进
select agegroup , gender , favor ,uid from user_tag_merge
(2)每个值前面,补上字段名,用()组合成元组
select ('agegroup', agegroup ) ,
('gender',gender ) , ('favor',favor ) ,uid
from user_tag_merge
(3)每个列用[]拼接成数组
select [ ('agegroup', agegroup ) ,
('gender',gender ) ,
('favor',favor )] tag_code_value ,uid
from user_tag_merge
(4)用arrayJoin炸开,类似于hive中的explode
SELECT
arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
uid
FROM user_tag_merge
(5) 把元组中的字段名和字段值拆开,并用这两个作为维度聚合uid
SELECT
tag_code_value.1 AS tag_code,
tag_code_value.2 AS tag_value,
groupArray(uid) AS us
FROM
(
SELECT
arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
uid
FROM user_tag_merge
) AS tv
GROUP BY
tag_code_value.1,
tag_code_value.2
(6) 这已经和我们要求的结果非常接近了。只不过关于用户id的聚合,groupArray实现的是拼接成用户id的数组,而我们想要的聚合是,聚合成一个Bitmap。
那只要改一下聚合函数就可以了。
把groupArray 替换成 groupBitmapState
SELECT
tag_code_value.1 AS tag_code,
tag_code_value.2 AS tag_value,
groupBitmapState (uid) AS us
FROM
(
SELECT
arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
uid
FROM user_tag_merge
) AS tv
GROUP BY
tag_code_value.1,
tag_code_value.2
bitmapToArray 把bitmap转成数组 bitmapToArray(groupBitmapState (uid))
这里聚合成bitmap的列没有显示是正常的,因为bitmap的结构本身无法用正常文本显示。
(7)接下来我们可以插入到bitmap表中。
insert into user_tag_value_string
select tag_code_value.1 as tag_code,tag_code_value.2 as tag_value ,
groupBitmapState( uid ) us
from (
SELECT
arrayJoin([('agegroup', agegroup), ('gender', gender), ('favor', favor)]) AS tag_code_value,
uid
FROM user_tag_merge
)tv
group by tag_code_value.1,tag_code_value.2
插入完成后,查询依旧不会正常显示
5.4 对Bitmap进行查询
1)条件组合查询
比如我想查询[90后]+[美食]的用户
select bitmapToArray( bitmapAnd( ( select us from user_tag_value_string where tag_value=‘ms’ and tag_code=‘favor’ ) , ( select us from user_tag_value_string where tag_value=‘90后’ and tag_code=‘agegroup’ ) ) )as res
这里首先用条件筛选出us, 每个代表一个Bitmap结构的uid集合,找到两个Bitmap后用bitmapAnd函数求交集。 然后为了观察结果用bitmapToArray函数转换成可见的数组。
2)范围值查询
比如要取
[90后]或者[80后] + [美食]
或者,消费金额大于1000 + [女性]
select bitmapToArray(
bitmapAnd(
( select groupBitmapMergeState(us) us from user_tag_value_string where tag_value='ms' and tag_code='favor' ),
( select groupBitmapMergeState(us) from user_tag_value_string where tag_value in ('90后','80后') and tag_code='agegroup' ) )
)as res
因为查询时,有可能需要针对某一个标签,取多个值,甚至是一个区间范围,那就会涉及多个值的userId集合,因此需要在子查询内部用 groupBitmapMergeState
进行一次合并,其实就多个集合取并集。
3)函数总结
函数 | |
---|---|
arrayJoin | 宽表转Bitmap表需要行转列,要用arrayJoin把多列数组炸成行。 |
groupBitmapState | 一种把聚合列的数字值聚合成Bitmap的聚合函数,普通值转成bitmap |
bitmapAnd | 求两个Bitmap值的交集(与运算) |
bitmapOr | 求两个Bitmap值的并集(或运算) |
bitmapXor | 求两个Bitmap值的差集(异或运算) |
bitmapToArray | 把Bitmap转换成数值数组 |
groupBitmapMergeState | 把一列中多个bitmap值进行并集聚合。 |
bitmapCardinality |
为什么要迁移及使用
标签计算完成后保存在hive虽然可以查询但是性能非常糟糕。而标签的使用往往是即时的。最常见的场景就是“用户分群”,也称“人群圈选”、“圈人”等等。
分群操作就是根据多个标签组合,产生一个用户集合,供营销、广告等部门使用。而这些操作计算量大,产生结果需要时效性高。
方案选型
选择方案最重要的依据就是数据量和时效性要求。
时效性 | 数据量 | 分群方案 |
---|---|---|
能接受隔天 | 无所谓 | HIVE宽表 |
即时产生 | 千万以下 | OLAP宽表(Elasticsearh,Tidb,Clickhouse…) |
即时产生 | 亿级 | Bitmap方式(Clickhouse,Doris) |
从Hive导入ClickHouse
工具类
object MyClickHouseUtil
private val properties=new Properties();
properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.
getResourceAsStream(""config.properties"") , "UTF-8"))
val CLICKHOUSE_URL = properties.getProperty("clickhouse.url")
def executeSql(sql: String ): Unit =
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
val connection: Connection = DriverManager.getConnection(CLICKHOUSE_URL, null, null)
val statement: Statement = connection.createStatement()
statement.execute(sql)
connection.close()
先通过把数据查询成为Dataframe ,再通过行动算子写入至Clickhouse的宽表。
object TaskTransformChApp
def main(args: Array[String]): Unit =
// 读取宽表数据
val sparkConf: SparkConf = new SparkConf().setAppName("HiveToCH")//.setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val properties=new Properties();
properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.
getResourceAsStream(""config.properties"") , "UTF-8"))
//用户画像的数据库名 hive
val userProfileDbName = properties.getProperty("user-profile.dbname")
//数仓数据库名 hive
val wareHouseDbName = properties.getProperty("data-warehouse.dbname")
//hdfs地址
val hdfsStorePath = properties.getProperty("hdfs-store.path")
//jdbc:clickhouse://***:***/user_profile
val clickhouseUrl = properties.getProperty("clickhouse.url")
// 得到宽表的列数据
val tableName="up_tag_merge_" + taskDate.replace("-","")
//查询
val tableDf: DataFrame = sparkSession.sql("select * from user_profile."+tableName).toDF()
val columns: Array[String] = tableDf.columns
// 形成RDD
val dropTableSql= s"drop table IF EXISTS $userProfileDbName.$tableName"
MyClickHouseUtil.executeSql(dropTableSql,clickhouseUrl)
//建立clickhouse表
//create table user_info
//(
// uid String,
// gender String,
// city String,
// date String
//)engine=MergeTree()
//order by (uid);
val columnsSql: String = columns.map(column => column.toLowerCase+" String").mkString(",")
val createTableSql= "create table user_profile."+tableName + "("+columnsSql+") engine =MergeTree() order by uid"
MyClickHouseUtil.executeSql(createTableSql,clickhouseUrl)
// 利用jdbc算子写入clickhouse
tableDf.write.mode(SaveMode.Append)
.option("batchsize", "100") // 尽量以批次的形式写入clickhouse
.option("isolationLevel", "NONE") // 关闭事务
.option("numPartitions", "4") // 设置并发
.option("driver","ru.yandex.clickhouse.ClickHouseDriver")
.jdbc(clickhouseUrl,tableName,new Properties())
以上是关于ClickHouse实践的主要内容,如果未能解决你的问题,请参考以下文章
clickhouse基于ClickHouse的海量数据交互式OLAP分析场景实践