ClickHouse Kafka引擎不消费问题排查

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ClickHouse Kafka引擎不消费问题排查相关的知识,希望对你有一定的参考价值。

参考技术A 开发环境运行正常的程序在测试程序一直运行不了,提示kafka引擎表不能正常工作。我们ClickHouse版本是20.8.3。

发现kafka引擎报错提示字段不存在,我们的业务流程是kafka引擎表--->ods层--->dws层。其中引擎表--ods,ods到dws都是通过物化视图来进行数据预处理。出问题的是ods到dws的物化视图,导致kafka引擎表无法正常工作。当然,dws、ods层都是没有数据的。从这一点来看,ClickHouse物化视图确实无法保证数据的一致性,其就是个管道。

知道问题所在就晓得如何解决了,排查过程中发现ClickHouse此版本的一个BUG:就是较复杂的嵌套SQL,执行后,可能存在别名丢失的问题。比如如下的一个SQL,执行后,查看表的DDL语句发现别名DDD会丢失。建表的时候需要特别注意。

clickhouse与kafka集成

参考技术A clickhouse支持与多种存储引擎集成,可以从集成的引擎里面读取消息,然后写到真正的数据存储表里。

clickhouse批量写入的性能比较好,我们的业务场景下会大批量的产生数据,如果使用clickhouse-jdbc去写的,写入时机和每批次写入的数量不好把控,最终选择了先将消息写入kafka,然后由clickhouse从kafka消费数据,clickhouse server消费到数据之后写入真正的数据表。

clickhouse集成kafka引擎见官方文档:
https://clickhouse.com/docs/zh/engines/table-engines/integrations/kafka/

下面的介绍会与官方文档有重复,然后补充一些集成过程中遇到的坑。

下面介绍clickhouse与kafka集成的步骤,clickhouse版本是22.1.3.7

必要参数

可选参数

关于必选参数中的kafka_format参数,参见Formats部分,format具体解释如下
https://clickhouse.com/docs/zh/interfaces/formats/ 。

JSONEachRow, JSONStringsEachRow, JSONCompactEachRow, JSONCompactStringsEachRow
这几种格式,ClickHouse会将行输出为用换行符分隔的JSON值,这些输出数据作为一个整体时,由于没有分隔符(,)因而不是有效的JSON文档。
官方文档给了一些示例。

由于我的真实的数据表,有一个字段是json类型的字符串,但是一开始设置kafka_format的类型为JSONEachRow时,从kafka消费数据会报错,所以kafka_format格式设置成了JSONAsString,具体的错误后面贴出来。

创建kafka引擎表,用于从kafka消费数据

由于我的数据结构里有嵌套json,如果使用JSONEachRow,有个字段是json类型的字符串,带转义字符,导致clickhouse解析失败,没找到解决办法,所以使用了JSONAsString格式。

一个简单的MergeTree引擎的表,其中content是json格式的字符串。

创建的物化视图用于把从kafka消费到的数据,写到真实的数据表里,在这个例子里,msg_json_source从kafka消费到数据,然后通过物化视图msg_json_source_consumer将消费到的数据写到真实的数据表msg_target中。

由于从kafka消费到的数据就是一个json字符串,在这里使用JSONExtractString等json字段提取工具,提取msg里的字段,比如biz,sender_id,content等字段。

status_time原本计划用DatTime64类型的,但是这个时间格式有坑,最终选择了使用UInt64存毫秒级时间戳,具体的问题下面再介绍。

在clickhouse创建好3张表之后(kafka引擎表,真实数据表,物化视图表),往kafka发消息
本地安装一个简易的kafka服务端,然后创建topic

创建好topic之后,使用Java客户端往kafka发消息,使用confluent client发也可以。
添加kafka依赖

实体类,使用fastjson的@JSONField注解,实体类转字符串的时候,将驼峰转换为下划线

测试类

最终发送完,我们查看一下clickhouse里的数据表的数据,可以发现我们发送到kakfa里的数据,已经成功的消费,并且写入到真实的数据表里了。

当时测试环境部署的版本是21.9,但是这个版本有问题,不推荐安装,建议直接部署22以上的clickhouse

我一开始就是使用的JSONEachRow格式,但是我的消息体里还有嵌套的json,类似下面这种格式,里面有个字段还是个json,转行成字符串带转义字符。
然后消息体的string字符串贴一条在这里

然后clickhouse解析消息体报错,当时的错找不到了,现在复现不出来了,非常的难顶。。。。
后来因为赶版本的原因把kafka_format换成了JSONAsString。

clickhouse是支持DateTime64格式的,可以到毫秒级,但是实际使用过程中却有些坑在,

首先是有的客户端解析毫秒字符串有问题,其次是使用JSONExtract*的方法,会有差异,再然后是jdbc查询的时候,也会导致时间查询有问题。
拿毫秒时间戳和秒级时间戳做试验,clickhouse-server版本是22.3.1.1

把上面的kafka引擎表拿出来改一下

其中status_time这个字段的类型改成DateTime64(3, 'Asia/Shanghai'),使用JSONExtractUInt提取时间,看下效果

首先发条数据,数据内容如下

传入的是毫秒级时间戳,然后数据表存储的时候就变成了2282年

然后如果传入秒级的时间戳,真实的数据是这样

clickhouse存储的时候看着时间正常了,但是毫秒丢失了

然后修改一下物化视图的字段提取方式,之前是 JSONExtractUInt(msg,'status_time') as status_time,现在改成使用 JSONExtractString(msg,'status_time') as status_time提取时间
会发现时间类型又正常了。

这一条数据内容如下

最终使用JSONExtractString提取毫秒时间戳,得到了正确的DateTime64的时间,非常的神奇

最终我决定来了个釜底抽薪的方法,时间直接用UInt64存,因为我发送出去的数据是毫秒级时间戳,最终存时间戳,查询时间范围的时候直接用long类型的数据between好了。

这也是无奈之举,万一哪天server更新版本,导致时间出现问题,那就完蛋了,希望后面时间可以稳定一点吧。

以上是关于ClickHouse Kafka引擎不消费问题排查的主要内容,如果未能解决你的问题,请参考以下文章

ClickHouse kafka引擎落盘分布式表

如何在clickhouse中更改kafka引擎的设置

Clickhouse外部储存表引擎(HDFSMySQLKafka)

ClickHouse Kafka 表引擎和仅一次支持

使用带有kafka引擎的clickhouse进行融合模式注册表身份验证

Flink 消费Kafka每日不定时积压(非重启不能解决)问题排查解决