Logstash同步Hive和Clickhouse

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Logstash同步Hive和Clickhouse相关的知识,希望对你有一定的参考价值。

参考技术A

工作中我们遇到了把Hive数据同步到Clickhouse的业务需求,一开始我们写Spark任务,用SparkSQL读Hive,再用JDBC写入到Clickhouse。

后来,随着要同步的表越来越多,每次都写Spark任务,成本就显得有些高了。于是,写了一个通用的Spark任务,指定Hive表、字段,指定Clickhouse表、字段,每次指定不同的参数。

再后来,业务越来越复杂,不仅是简单的同步,而是要支持更复杂的SQL,结果进行数据类型转换、值转化等,然后再插入Clickhouse。

这不是ETL要干的事儿吗?!

当然,继续增强之前的Spark,完全可以实现这个功能。但是说到ETL,不是有专业强大的Logstash吗,为什么要重复造轮子?

经过一番调研,还真有人写了Logstash插件,用来导出数据到Clickhouse: logstash-output-clickhouse

输出端搞定了,输入端怎么搞呢?很建达,用JDBC插件就可以了。

如上,配置jdbc连接信息即可。

需要说明的是,相关的jar包比较多,需要给全了,否则会有各种ClassNotFoundException。完整的jar列表为:

这些jar最好与hive环境版本一致,我们用的是CDH版,所以都是从CDH目录下找到的jar。

Clickhouse插件使用说明参考: https://github.com/mikechris/logstash-output-clickhouse

主要说下安装过程。

说明文档里说的 bin/logstash-plugin install logstash-output-clickhouse 方式,没有安装成功,所以只能自己编译安装。

先clone源码,然后进入源码根路径:

编译:

此时,若没有安装ruby环境,按照提示安装一下,再编译。

编译成功后,会多出一个文件

安装:

logstash的安装就不多说了,按照logstash官方文档安装就可以了。

此时,如果logstash版本是5.x,可能会遇到一个错误:

按照提示,修改gemfile:

修改logstash-mixin-http_client的版本:

原来是>6且<7,改成>5且<6。

然后,再次编译、安装,就可以了。

按照文档中的使用说明,配置Clickhouse连接信息即可:

这部分工作可以放在filter里处理,各种filter插件就不说了,参考logstash官方文档吧。

clickhouse使用waterdrop将Hive中的数据导入ClickHouse

1.概述

转载:使用waterdrop将Hive中的数据导入ClickHouse
这里仅仅自己学习用。

前言
最近有一个需求需要把hive的数据同步到clickhouse,而且数据量还比较大,所以使用导出csv再导入clickhouse的那种方式并不适合。由于公司使用的服务器是某云服务器,Hadoop的底层不是使用的是原生的hdfs,导致没法使用datax。

waterdrop
waterdrop的官方地址:https://interestinglab.github.io/waterdrop-docs/#/zh-cn/v1/

Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。

1)准备环境
使用 Waterdrop前请先准备好Spark和Java运行环境。java版本为jdk1.8

2)waterdrop下载和解压

下载地址:https://github.com/InterestingLab/waterdrop/releases

目前1.x的稳定版为v1.5.1。spark >= 2.3 下载 waterdrop-1.5.1.zip, spark < 2.3 下载waterdrop-1.5.1-with-spark.zip。

如果Github下载速度慢,可通过百度云(链接:https://pan.baidu.com/s/19GUwZPC2YBG9Pt7iuF9TNw 密码:upeb) 直接下载。

解压:unzip waterdrop-1.5.1.zip

3)更改启动目录
cd到waterdrop的config目录下,找到waterdrop-env.sh并打开,找到如果spark_home没有配置环境变量,需要把spark_home改了,如果配置有环境变量就不用管。

hive2clickhouse
1)通用配置
一个完整的Waterdrop配置包含spark, input, filter, output, 即:

spark 
    ...


input 
    ...


filter 
    ...


output 
    ...


spark是spark相关的配置,可配置的spark参数见: Spark Configuration, 其中master, deploy-mode两个参数不能在这里配置,需要在Waterdrop启动脚本中指定。
input可配置任意的input插件及其参数,具体参数随不同的input插件而变化。
filter可配置任意的filter插件及其参数,具体参数随不同的filter插件而变化。
output可配置任意的output插件及其参数,具体参数随不同的output插件而变化。filter处理完的数据,会发送给output中配置的每个插件

2)编写脚本

spark 
  spark.sql.catalogImplementation = "hive"
  spark.app.name = "Waterdrop"
  spark.executor.instances = 30
  spark.executor.cores = 2
  spark.executor.memory = "2g"

input 
    hive 
        pre_sql = "select id,user_id,grade,money_type_id,money,shop,amount,balance,bi_platform_id,server_id,act_type,act_method,act_method2,record_time,bi_sid,gather_time,if(record_time is null,dateline,from_unixtime(record_time,'yyyy-MM-dd')) as dateline,bi_pid from sgz_big_2017.ods_bi_money_list where dateline='2021-01-01'"
        table_name = "ods_bi_money_list"
    


filter 

output 
    clickhouse 
        host = "xxx.xx.xxx.xx:8123"
        database = "big_sgz_2017"
        table = "bi_money_list"
        fields = [ "id","user_id","grade","money_type_id","money","shop","amount","balance","bi_platform_id","server_id","act_type","act_method","act_method2","record_time","bi_sid","gather_time","dateline","bi_pid"]
        username = "default"
        password = "m8yjvWQ+"
    


相关配置参考clickhouse输出配置

3)部署和运行
在本地以local方式运行Waterdrop

 ./bin/start-waterdrop.sh --master local[4] --deploy-mode client --config ./config/application.conf
 

在Spark Standalone集群上运行Waterdrop

# client 模式
./bin/start-waterdrop.sh --master spark://207.184.161.138:7077 --deploy-mode client --config ./config/application.conf

# cluster 模式
./bin/start-waterdrop.sh --master spark://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf

在Yarn集群上运行Waterdrop

# client 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/application.conf

# cluster 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config ./config/application.conf

在Mesos上运行Waterdrop

# cluster 模式
./bin/start-waterdrop.sh --master mesos://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf

cluster、client、local模式下必须把hive-site.xml置于提交任务节点的HADOOP_CONF目录下(或者放在$SPARK_HOME/conf下面),IDE本地调试将其放在resources目录

4)遇到的问题
max concurrent queries
在设置spark的时候并行度设置过高,导致连接数超过ck配置的最大连接数。
解决办法:降低spark的并行度,增加ck的连接数(max_concurrent_queries)
connection reset


目前暂时没有找到是什么原因导致的超时连接,但是减少并行线程就不会报这个错误。
解决办法:
1)减少spark的并行度
2)在clickhouse的output插件中增加clickhouse.socket_timeout参数

以上是关于Logstash同步Hive和Clickhouse的主要内容,如果未能解决你的问题,请参考以下文章

大数据:ClickHouse-HDFS集成

ClickHouse数据导入

建立Hive和Hbase的映射关系,通过Spark将Hive表中数据导入ClickHouse

hive导入到clickhouse的几种方式总结

比Hive快800倍!大数据实时分析领域黑马开源ClickHouse

基于Seatunnel连通Hive数仓和ClickHouse的实战