Clickhouse外部储存表引擎(HDFSMySQLKafka)
Posted Bulut0907
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Clickhouse外部储存表引擎(HDFSMySQLKafka)相关的知识,希望对你有一定的参考价值。
目录
外部储存表引擎只负责元数据管理,和数据查询
1. HDFS表引擎
HDFS的安装请参考Centos7上Hadoop 3.3.1的全分布式安装过程
1.1 准备工作
- HDFS表引擎不支持Kerberos认证,如果开启,请关闭HDFS的Kerberos认证
- HDFS上新建clickhouse目录
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# hadoop fs -mkdir /clickhouse
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# hadoop fs -ls /
Found 1 items
drwxr-xr-x - root supergroup 0 2021-07-21 08:15 /clickhouse
[root@clickhouse1 ~]#
- 添加新的Clickhouse集群
创建分布式HDFS表时,不需要副本功能,所以我们添加一个新的Clickhouse集群,在/etc/clickhouse-server/config.d/metrika.xml
的clickhouse_remote_servers
标签里,添加以下内容:
<sharding_cluster>
<shard>
<replica>
<host>clickhouse1</host>
<port>9000</port>
<user>default</user>
<password>default123</password>
<weight>1</weight>
</replica>
</shard>
<shard>
<replica>
<host>clickhouse2</host>
<port>9000</port>
<user>default</user>
<password>default123</password>
<weight>1</weight>
</replica>
</shard>
<shard>
<replica>
<host>clickhouse3</host>
<port>9000</port>
<user>default</user>
<password>default123</password>
<weight>1</weight>
</replica>
</shard>
<shard>
<replica>
<host>clickhouse4</host>
<port>9000</port>
<user>default</user>
<password>default123</password>
<weight>1</weight>
</replica>
</shard>
</sharding_cluster>
然后重启Clickhouse服务器
1.2 HDFS表负责读写
- 分布式表的创建
clickhouse1 :)
clickhouse1 :) create table hdfs_table1_local on cluster sharding_cluster(
:-] id UInt32,
:-] name String
:-] ) engine = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table1', 'CSV');
CREATE TABLE hdfs_table1_local ON CLUSTER sharding_cluster
(
`id` UInt32,
`name` String
)
ENGINE = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table1', 'CSV')
Query id: 46414107-0399-48dd-99bc-6839b16a8fdd
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 1 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.327 sec.
clickhouse1 :)
其中数据的文件格式,常用的有CSV、TSV、JSON
这里我们只分布式的创建本地表,因为Distributed表引擎insert数据,需要Zookeeper的协同,而HDFS表引擎不支持设置Zookeeper路径;但是分布式的本地表可以进行读写分离
- 插入数据
clickhouse1 :)
clickhouse1 :) insert into hdfs_table1_local select number, concat('code', toString(number)) code from numbers(5);
INSERT INTO hdfs_table1_local SELECT
number,
concat('code', toString(number)) AS code
FROM numbers(5)
Query id: 0fd035df-7ab1-424e-9efc-bffbe42c32cc
Ok.
0 rows in set. Elapsed: 0.043 sec.
clickhouse1 :)
只能插入一次数据,如果再次执行,会抱异常DB::Exception: File: /clickhouse/hdfs_table1 is already exists.
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from hdfs_table1_local;
SELECT *
FROM hdfs_table1_local
Query id: 23a1b6b2-7f8e-4508-ba62-f30b09706981
┌─id─┬─name──┐
│ 0 │ code0 │
│ 1 │ code1 │
│ 2 │ code2 │
│ 3 │ code3 │
│ 4 │ code4 │
└────┴───────┘
5 rows in set. Elapsed: 0.064 sec.
clickhouse1 :)
- HDFS文件查看
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# hadoop fs -cat /clickhouse/hdfs_table1
0,"code0"
1,"code1"
2,"code2"
3,"code3"
4,"code4"
[root@clickhouse1 ~]#
insert数据才会生成hdfs_table1文件,drop表时,hdfs_table1文件不会删除
1.3 HDFS表负责读, 不负责写
- HDFS路径的匹配规则
规则 | 含义 | 示例 | 示例说明 |
---|---|---|---|
绝对路径 | 指定单个文件 | /clickhouse/hdfs_table2 | 指定hdfs_table2单个文件 |
*通配符 | 指定目录下的所有文件 | /clickhouse/* | 指定clickhouse目录下的所有文件 |
?通配符 | 匹配文件名的一个字符 | /clickhouse/hdfs_table? | 可以匹配hdfs_table2、hdfs_tableA等 |
{M…N} | 数字区间 | /clickhouse/hdfs_table{2…3} | 可以匹配hdfs_table2、hdfs_table3 |
- 数据准备
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# hadoop fs -cat /clickhouse/hdfs_table2_1
0,"code0"
1,"code1"
2,"code2"
3,"code3"
4,"code4"
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# hadoop fs -cat /clickhouse/hdfs_table2_2
5,code5
6,code6
7,code7
8,code8
9,code9
[root@clickhouse1 ~]#
数据不能有空行,String类型的字段有无双引号都可以
- 创建分布式本地表
clickhouse1 :)
clickhouse1 :) create table hdfs_table2_local on cluster sharding_cluster(
:-] id UInt32,
:-] name String
:-] ) engine = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table2_{1..2}', 'CSV');
CREATE TABLE hdfs_table2_local ON CLUSTER sharding_cluster
(
`id` UInt32,
`name` String
)
ENGINE = HDFS('hdfs://clickhouse1:9099/clickhouse/hdfs_table2_{1..2}', 'CSV')
Query id: 94cdfdf1-f00d-4d21-bbc9-7f451aa656a5
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 1 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.188 sec.
clickhouse1 :)
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from hdfs_table2_local;
SELECT *
FROM hdfs_table2_local
Query id: f61d2831-0a00-475d-a205-31d64cde6986
┌─id─┬─name──┐
│ 0 │ code0 │
│ 1 │ code1 │
│ 2 │ code2 │
│ 3 │ code3 │
│ 4 │ code4 │
└────┴───────┘
┌─id─┬─name──┐
│ 5 │ code5 │
│ 6 │ code6 │
│ 7 │ code7 │
│ 8 │ code8 │
│ 9 │ code9 │
└────┴───────┘
10 rows in set. Elapsed: 0.091 sec.
clickhouse1 :)
一个文件形成一个分区
2. mysql表引擎
Mysql的安装请参考centos7安装mysql8.0.25版本
2.1 表引擎的基本操作
- Mysql数据库的数据
mysql>
mysql> select * from test_db.clickhouse_mysql_test;
+----+-------+-----------+
| id | name | city |
+----+-------+-----------+
| 1 | name1 | Beijing |
| 2 | name2 | Shanghai |
| 3 | name3 | Guangzhou |
+----+-------+-----------+
3 rows in set (0.01 sec)
mysql>
- 创建分布式本地表
clickhouse1 :)
clickhouse1 :) create table clickhouse_mysql_test_local on cluster sharding_cluster(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = MySQL('clickhouse1:3306', 'test_db', 'clickhouse_mysql_test', 'root', 'Root_123', 0, "update id = id + 1, name = concat('update_', name)");
CREATE TABLE clickhouse_mysql_test_local ON CLUSTER sharding_cluster
(
`id` UInt32,
`name` String,
`city` String
)
ENGINE = MySQL('clickhouse1:3306', 'test_db', 'clickhouse_mysql_test', 'root', 'Root_123', 0, `update id = id + 1, name = concat('update_', name)`)
Query id: 7d3f6fed-c3f3-4482-892a-bd37dc9d7721
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.192 sec.
clickhouse1 :)
第6个参数默认为0;当为1时,clickhouse的insert into有replace into的功能(replace into当主键或唯一索引相同,删除原数据,插入新数据),此时第7个参数必须为0
第7个参数默认为0;当为update字符串时,且Clickhouse insert的数据的主键或唯一索引与原数据相同时,则原数据应用update字符串,update没有的字段保留原字段数据
- 插入数据
clickhouse1 :)
clickhouse1 :) insert into clickhouse_mysql_test_local(id, name, city) values(3, 'name4', 'Shenzhen');
INSERT INTO clickhouse_mysql_test_local (id, name, city) VALUES
Query id: ebfb1435-aa19-4c38-9226-98775006d47c
Ok.
1 rows in set. Elapsed: 0.233 sec.
clickhouse1 :)
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from clickhouse_mysql_test_local;
SELECT *
FROM clickhouse_mysql_test_local
Query id: 1b8f71e0-86d1-448d-9cf7-0a8614419665
┌─id─┬─name─────────┬─city──────┐
│ 1 │ name1 │ Beijing │
│ 2 │ name2 │ Shanghai │
│ 4 │ update_name3 │ Guangzhou │
└────┴──────────────┴───────────┘
3 rows in set. Elapsed: 0.009 sec.
clickhouse1 :)
2.2 表引擎配合物化视图
- 创建表
clickhouse1 :)
clickhouse1 :) create table table_mysql_local on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = ReplicatedMergeTree('/clickhouse/tables/table_mysql/{shard}', '{replica}')
:-] order by id
:-] primary key id
:-] partition by city;
CREATE TABLE table_mysql_local ON CLUSTER sharding_ha
(
`id` UInt32,
`name` String,
`city` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/table_mysql/{shard}', '{replica}')
PARTITION BY city
PRIMARY KEY id
ORDER BY id
Query id: 8aaf2bdd-d8c1-4e3c-a58d-e5adae2552c1
ysql_all on cluster sharding_ha(
id UInt32,
name String,
city String
) engine = Distributed(sharding_ha, default, table_mysql_local, id)
populate
as select * from clickhouse_mysql_test_local;
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.191 sec.
clickhouse1 :)
clickhouse1 :) create materialized view view_mysql_all on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Distributed(sharding_ha, default, table_mysql_local, id)
:-] populate
:-] as select * from clickhouse_mysql_test_local;
CREATE MATERIALIZED VIEW view_mysql_all ON CLUSTER sharding_ha
(
`id` UInt32,
`name` String,
`city` String
)
ENGINE = Distributed(sharding_ha, default, table_mysql_local, id) POPULATE AS
SELECT *
FROM clickhouse_mysql_test_local
Query id: e9d07014-3bde-4026-8743-6e7ade47cbb7
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.282 sec.
clickhouse1 :)
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from view_mysql_all;
SELECT *
FROM view_mysql_all
Query id: 8b523a07-8c1d-48ab-8d51-544e88f623c0
┌─id─┬─name──┬─city─────┐
│ 2 │ name2 │ Shanghai │
└────┴───────┴──────────┘
┌─id─┬─name─────────┬─city──────┐
│ 4 │ update_name3 │ Guangzhou │
└────┴──────────────┴───────────┘
┌─id─┬─name──┬─city────┐
│ 1 │ name1 │ Beijing │
└────┴───────┴─────────┘
3 rows in set. Elapsed: 0.014 sec.
clickhouse1 :)
表引擎不支持update和delete,所以table_mysql_local表可以使用CollapsingMergeTree引擎来解决
3. Kafka表引擎
Kafka的安装可以参考在Centos7上全分布式安装kafka2.8.0
Kafka表引擎能实时接收Kafka的数据,但不支持Exactly Once语义
3.1 准备Kafka测试数据
- 创建topic
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# kafka_2.13-2.8.0/bin/kafka-topics.sh --bootstrap-server clickhouse1:9092,clickhouse2:9092,clickhouse3:9092 --create --topic clickhouse_kafka_test --replication-factor 1 --partitions 3
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic clickhouse_kafka_test.
[root@clickhouse1 ~]#
- 发送测试数据
[root@clickhouse1 ~]#
[root@clickhouse1 ~]# kafka_2.13-2.8.0/bin/kafka-console-producer.sh --bootstrap-server clickhouse1:9092,clickhouse2:9092,clickhouse3:9092 --topic clickhouse_kafka_test
>{"id":1,"name":"name1","city":"Beijing"}
>{"id":2,"name":"name2","city":"Shanghai"}
>{"id":3,"name":"name3","city":"Guangzhou"}
>
3.2 Kafka表引擎操作(第二次select查询不到数据)
- 设置auto.offset.reset参数
clickhouse的kafka表引擎消费时,auto.offset.reset默认是latest,编辑/etc/clickhouse-server/config.xml修改, 内容如下,然后重启clickhouse
<kafka>
<auto_offset_reset>earliest</auto_offset_reset>
</kafka>
- 创建表
clickhouse1 :)
clickhouse1 :) create table clickhouse_kafka_test_local(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Kafka()
:-] settings
:-] kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092',
:-] kafka_topic_list = 'clickhouse_kafka_test',
:-] kafka_group_name = 'clickhouse_kafka_test_group',
:-] kafka_format = 'JSONEachRow',
:-] kafka_skip_broken_messages = 100;
CREATE TABLE clickhouse_kafka_test_local
(
`id` UInt32,
`name` String,
`city` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092', kafka_topic_list = 'clickhouse_kafka_test', kafka_group_name = 'clickhouse_kafka_test_group', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 100
Query id: 20d1cb23-e3f7-40be-b227-d9a204fd7451
Ok.
0 rows in set. Elapsed: 0.035 sec.
clickhouse1 :)
- 查询数据
clickhouse1 :) select * from clickhouse_kafka_test_local;
SELECT *
FROM clickhouse_kafka_test_local
Query id: 40ab902f-54b5-4cd2-98c6-d20c9985b9e8
┌─id─┬─name──┬─city──────┐
│ 2 │ name2 │ Shanghai │
│ 1 │ name1 │ Beijing │
│ 3 │ name3 │ Guangzhou │
└────┴───────┴───────────┘
3 rows in set. Elapsed: 0.566 sec.
clickhouse1 :) select * from clickhouse_kafka_test_local;
SELECT *
FROM clickhouse_kafka_test_local
Query id: 19479a67-9734-4f5e-99e8-6b708e0f5fb3
Ok.
0 rows in set. Elapsed: 5.013 sec.
clickhouse1 :)
由上面可见,select一次数据就代表消费完了,再次select查询不到数据了,因为从kafka新的consumer offset消费数据了
3.2 Kafka表引擎配合物化视图(多次select查询到数据)
- 准备工作
删除clickhouse_kafka_test_local表
clickhouse1 :) drop table clickhouse_kafka_test_local;
删除kafka的topic clickhouse_kafka_test
[root@clickhouse1 ~]# kafka_2.13-2.8.0/bin/kafka-topics.sh --bootstrap-server clickhouse1:9092,clickhouse2:9092,clickhouse3:9092 --delete --topic clickhouse_kafka_test
再次执行步骤3.1
- 创建表
创建对接Kafka的拥有Kafka表引擎的分布式数据渠道表
clickhouse1 :)
clickhouse1 :) create table clickhouse_kafka_test_channelLocal on cluster sharding_cluster(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Kafka()
:-] settings
:-] kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092',
:-] kafka_topic_list = 'clickhouse_kafka_test',
:-] kafka_group_name = 'clickhouse_kafka_test_group',
:-] kafka_format = 'JSONEachRow',
:-] kafka_skip_broken_messages = 100;
CREATE TABLE clickhouse_kafka_test_channelLocal ON CLUSTER sharding_cluster
(
`id` UInt32,
`name` String,
`city` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'clickhouse1:9092,clickhouse2:9092,clickhouse3:9092', kafka_topic_list = 'clickhouse_kafka_test', kafka_group_name = 'clickhouse_kafka_test_group', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 100
Query id: a719f376-838f-4494-bcee-e4d470b34ae2
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.141 sec.
clickhouse1 :)
创建一个分布式的物化视图,用于同步kafka数据渠道表的数据,到分布式的副本合并树引擎表
clickhouse1 :)
clickhouse1 :) create materialized view clickhouse_kafka_test_viewAll on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = Distributed(sharding_ha, default, clickhouse_kafka_test_storageLocal, id)
:-] populate
:-] as select * from clickhouse_kafka_test_channelLocal;
CREATE MATERIALIZED VIEW clickhouse_kafka_test_viewAll ON CLUSTER sharding_ha
(
`id` UInt32,
`name` String,
`city` String
)
ENGINE = Distributed(sharding_ha, default, clickhouse_kafka_test_storageLocal, id) POPULATE AS
SELECT *
FROM clickhouse_kafka_test_channelLocal
Query id: e4084a86-f777-45e7-bef7-e2f568d93db2
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 3 │ 3 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 2 │ 1 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 15.574 sec.
clickhouse1 :)
创建一个分布式的副本合并树引擎表,用于储存物化视图的数据
clickhouse1 :)
clickhouse1 :) create table clickhouse_kafka_test_storageLocal on cluster sharding_ha(
:-] id UInt32,
:-] name String,
:-] city String
:-] ) engine = ReplicatedMergeTree('/clickhouse/tables/clickhouse_kafka_test/{shard}', '{replica}')
:-] order by id
:-] primary key id
:-] partition by city;
CREATE TABLE clickhouse_kafka_test_storageLocal ON CLUSTER sharding_ha
(
`id` UInt32,
`name` String,
`city` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/clickhouse_kafka_test/{shard}', '{replica}')
PARTITION BY city
PRIMARY KEY id
ORDER BY id
Query id: 9506837c-f1ca-4c0f-aa9a-2370cc423546
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │ 0 │ │ 3 │ 3 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.194 sec.
clickhouse1 :)
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from clickhouse_kafka_test_viewAll;
SELECT *
FROM clickhouse_kafka_test_viewAll
Query id: 26073fb1-8011-465a-b52e-bec15174e451
┌─id─┬─name──┬─city─────┐
│ 2 │ name2 │ Shanghai │
└────┴───────┴──────────┘
┌─id─┬─name──┬─city────┐
│ 1 │ name1 │ Beijing │
└────┴───────┴─────────┘
┌─id─┬─name──┬─city──────┐
│ 3 │ name3 │ Guangzhou │
└────┴───────┴───────────┘
3 rows in set. Elapsed: 0.017 sec.
clickhouse1 :) select * from clickhouse_kafka_test_viewAll;
SELECT *
FROM clickhouse_kafka_test_viewAll
Query id: a7059da9-40b7-4143-b91c-708d807071eb
┌─id─┬─name──┬─city─────┐
│ 2 │ name2 │ Shanghai │
└────┴───────┴──────────┘
┌─id─┬─name──┬─city────┐
│ 1 │ name1 │ Beijing │
└────┴───────┴─────────┘
┌─id─┬─name──┬─city──────┐
│ 3 │ name3 │ Guangzhou │
└────┴───────┴───────────┘
3 rows in set. Elapsed: 0.009 sec.
clickhouse1 :)
两次select都是能查询到数据的,因为此时select查询的数据是clickhouse_kafka_test_storageLocal表的数据
- 装载和卸载物化视图
如果想停止同步,可以卸载物化视图
clickhouse1 :)
clickhouse1 :) detach table clickhouse_kafka_test_viewAll on cluster sharding_ha;
DETACH TABLE clickhouse_kafka_test_viewAll ON CLUSTER sharding_ha
Query id: dc221033-fc04-4e95-95ad-fbe9a052d27f
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.135 sec.
clickhouse1 :)
想再次开启同步,可以装载物化视图
clickhouse1 :)
clickhouse1 :) ATTACH TABLE clickhouse_kafka_test_viewAll on cluster sharding_ha;
ATTACH TABLE clickhouse_kafka_test_viewAll ON CLUSTER sharding_ha
Query id: 7940821b-8d55-452a-840a-9da97082e1de
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.131 sec.
clickhouse1 :)
3.3 kafka表引擎的参数说明
- kafka_format(必填):kafka消息的格式,必须是clickhouse支持的格式,如TSV、JSONEachRow、CSV等
- kafka_row_delimiter:两行之间的分隔符, 默认值是’\\0’
- kafka_num_consumers:消费组中实际消费数据的消费者数量,默认是1;一个partition只能被一个消费者消费
- kafka_skip_broken_messages:默认是0,表示出现一条解析错误的消息,则停止接收数据;如果设置为10,则表示解析错误的消息总数达到10条时,停止接收数据,前9条错误消息自动忽略
- stream_poll_timeout_ms:默认500ms,kafka表引擎拉取数据的时间间隔,数据拉取放入缓存,刷新到数据表由两个参数控制,只要一个满足即可:
- kafka_max_block_size:默认等于max_block_size=65536, 表示一个block写入的数据达到这个值时,刷新到数据表
- stream_flush_interval_ms:缓存数据刷新到数据表的时间间隔
- kafka_commit_every_batch:一个block由多个消息batch组成,参数默认为0,表示一个block写入数据表后,才提交kafka consumer offset;如果为1,则表示一个batch写入数据后,再提交kafka consumer offset
- 自定义参数:kafka表引擎底层与kafka通信的部分是基于librdkafka实现的,自定义参数看这里,自定义参数需要在/etc/clickhouse-server/config.xml的kafka标签定义
以上是关于Clickhouse外部储存表引擎(HDFSMySQLKafka)的主要内容,如果未能解决你的问题,请参考以下文章