Logstash同步Hive和Clickhouse
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Logstash同步Hive和Clickhouse相关的知识,希望对你有一定的参考价值。
参考技术A工作中我们遇到了把Hive数据同步到Clickhouse的业务需求,一开始我们写Spark任务,用SparkSQL读Hive,再用JDBC写入到Clickhouse。
后来,随着要同步的表越来越多,每次都写Spark任务,成本就显得有些高了。于是,写了一个通用的Spark任务,指定Hive表、字段,指定Clickhouse表、字段,每次指定不同的参数。
再后来,业务越来越复杂,不仅是简单的同步,而是要支持更复杂的SQL,结果进行数据类型转换、值转化等,然后再插入Clickhouse。
这不是ETL要干的事儿吗?!
当然,继续增强之前的Spark,完全可以实现这个功能。但是说到ETL,不是有专业强大的Logstash吗,为什么要重复造轮子?
经过一番调研,还真有人写了Logstash插件,用来导出数据到Clickhouse: logstash-output-clickhouse
输出端搞定了,输入端怎么搞呢?很建达,用JDBC插件就可以了。
如上,配置jdbc连接信息即可。
需要说明的是,相关的jar包比较多,需要给全了,否则会有各种ClassNotFoundException。完整的jar列表为:
这些jar最好与hive环境版本一致,我们用的是CDH版,所以都是从CDH目录下找到的jar。
Clickhouse插件使用说明参考: https://github.com/mikechris/logstash-output-clickhouse
主要说下安装过程。
说明文档里说的 bin/logstash-plugin install logstash-output-clickhouse 方式,没有安装成功,所以只能自己编译安装。
先clone源码,然后进入源码根路径:
编译:
此时,若没有安装ruby环境,按照提示安装一下,再编译。
编译成功后,会多出一个文件
安装:
logstash的安装就不多说了,按照logstash官方文档安装就可以了。
此时,如果logstash版本是5.x,可能会遇到一个错误:
按照提示,修改gemfile:
修改logstash-mixin-http_client的版本:
原来是>6且<7,改成>5且<6。
然后,再次编译、安装,就可以了。
按照文档中的使用说明,配置Clickhouse连接信息即可:
这部分工作可以放在filter里处理,各种filter插件就不说了,参考logstash官方文档吧。
clickhouse使用waterdrop将Hive中的数据导入ClickHouse
1.概述
转载:使用waterdrop将Hive中的数据导入ClickHouse
这里仅仅自己学习用。
前言
最近有一个需求需要把hive的数据同步到clickhouse,而且数据量还比较大,所以使用导出csv再导入clickhouse的那种方式并不适合。由于公司使用的服务器是某云服务器,Hadoop的底层不是使用的是原生的hdfs,导致没法使用datax。
waterdrop
waterdrop的官方地址:https://interestinglab.github.io/waterdrop-docs/#/zh-cn/v1/
Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。
1)准备环境
使用 Waterdrop前请先准备好Spark和Java运行环境。java版本为jdk1.8
2)waterdrop下载和解压
下载地址:https://github.com/InterestingLab/waterdrop/releases
目前1.x的稳定版为v1.5.1。spark >= 2.3 下载 waterdrop-1.5.1.zip, spark < 2.3 下载waterdrop-1.5.1-with-spark.zip。
如果Github下载速度慢,可通过百度云(链接:https://pan.baidu.com/s/19GUwZPC2YBG9Pt7iuF9TNw 密码:upeb
) 直接下载。
解压:unzip waterdrop-1.5.1.zip
3)更改启动目录
cd到waterdrop的config目录下,找到waterdrop-env.sh并打开,找到如果spark_home没有配置环境变量,需要把spark_home改了,如果配置有环境变量就不用管。
hive2clickhouse
1)通用配置
一个完整的Waterdrop配置包含spark, input, filter, output, 即:
spark
...
input
...
filter
...
output
...
spark是spark相关的配置,可配置的spark参数见: Spark Configuration, 其中master, deploy-mode两个参数不能在这里配置,需要在Waterdrop启动脚本中指定。
input可配置任意的input插件及其参数,具体参数随不同的input插件而变化。
filter可配置任意的filter插件及其参数,具体参数随不同的filter插件而变化。
output可配置任意的output插件及其参数,具体参数随不同的output插件而变化。filter处理完的数据,会发送给output中配置的每个插件
2)编写脚本
spark
spark.sql.catalogImplementation = "hive"
spark.app.name = "Waterdrop"
spark.executor.instances = 30
spark.executor.cores = 2
spark.executor.memory = "2g"
input
hive
pre_sql = "select id,user_id,grade,money_type_id,money,shop,amount,balance,bi_platform_id,server_id,act_type,act_method,act_method2,record_time,bi_sid,gather_time,if(record_time is null,dateline,from_unixtime(record_time,'yyyy-MM-dd')) as dateline,bi_pid from sgz_big_2017.ods_bi_money_list where dateline='2021-01-01'"
table_name = "ods_bi_money_list"
filter
output
clickhouse
host = "xxx.xx.xxx.xx:8123"
database = "big_sgz_2017"
table = "bi_money_list"
fields = [ "id","user_id","grade","money_type_id","money","shop","amount","balance","bi_platform_id","server_id","act_type","act_method","act_method2","record_time","bi_sid","gather_time","dateline","bi_pid"]
username = "default"
password = "m8yjvWQ+"
相关配置参考clickhouse输出配置
3)部署和运行
在本地以local方式运行Waterdrop
./bin/start-waterdrop.sh --master local[4] --deploy-mode client --config ./config/application.conf
在Spark Standalone集群上运行Waterdrop
# client 模式
./bin/start-waterdrop.sh --master spark://207.184.161.138:7077 --deploy-mode client --config ./config/application.conf
# cluster 模式
./bin/start-waterdrop.sh --master spark://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf
在Yarn集群上运行Waterdrop
# client 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/application.conf
# cluster 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config ./config/application.conf
在Mesos上运行Waterdrop
# cluster 模式
./bin/start-waterdrop.sh --master mesos://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf
cluster、client、local模式下必须把hive-site.xml置于提交任务节点的HADOOP_CONF目录下(或者放在$SPARK_HOME/conf下面),IDE本地调试将其放在resources目录
4)遇到的问题
max concurrent queries
在设置spark的时候并行度设置过高,导致连接数超过ck配置的最大连接数。
解决办法:降低spark的并行度,增加ck的连接数(max_concurrent_queries)
connection reset
目前暂时没有找到是什么原因导致的超时连接,但是减少并行线程就不会报这个错误。
解决办法:
1)减少spark的并行度
2)在clickhouse的output插件中增加clickhouse.socket_timeout参数
以上是关于Logstash同步Hive和Clickhouse的主要内容,如果未能解决你的问题,请参考以下文章
建立Hive和Hbase的映射关系,通过Spark将Hive表中数据导入ClickHouse