Kettle Elasticsearch插件升级
Posted data之道
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kettle Elasticsearch插件升级相关的知识,希望对你有一定的参考价值。
一、背景
二、升级步骤
2.1 源码导入
2.2 版本号修改
2.3 ES Client对比
2.4 代码修改
2.5 打包发布
三、遗留问题
一、背景
项目中有对大数据量查询的需求,数据在hive上加工,ES作为对底层查询的支持。这样的体系结构涉及到hive数据迁移到ES的过程,计划用kettle的ES批量加载插件加载数据到ES,在实际开发中发现kettle连不上ES集群。
我们ES是5.5.1版本,而kettle默认支持的版本较低;Java Client连接ES,Client的主版本号要和ES集群主版本号保持一致,否则有可能连不上集群、或者即使连上也不支持新功能,所以要对ES插件进行升级。
二、升级步骤
2.1、源码导入
下载对应kettle版本源码、导入项目到IDE,项目路径:plugins/elasticsearch-bulk-insertx。8.0以后版本用maven管理,之前是ant,我们用的是6.0版本,下面的配置就以ant为准。(有下载源码的例子)
2.2、版本号修改
打开ivy.xml,看到es版本是1.5.2
增加对5.5.1版本支持:
2.3、ES Client对比
java client有两种方法连接到ES集群:Transport Client和Node Client,两者实现原理有很大不同。
Node Client会加入集群,但不存储数据、也就不能成为主节点。因本身是集群的一个节点,它知道整个集群的状态(所有的节点在哪、哪个分片在哪个节点上等),这意味着执行api时会少一层网络跳跃。
如果想要应用和集群分离、应用经常快速创建和销毁连接,那么Transport Client是理想选择,集群管理节点的增加和删除也是有开销的,如果频繁进行节点变动,会降低集群性能;如果只需要几个长连接、持久连接到集群的对象,那么node client是一个更好的选择,因为它知道集群的结构。
kettle在每次开始执行trans时才创建client连接、结束时要销毁连接,所以TC相对会更高效些。
2.4、代码修改
EnvUtil工具类可以获得kettle.properties配置文件中的配置项。
Builder builder = Settings.builder();
builder.put(meta.getSettings());
//开启自动检查机器节点(嗅探功能)
builder.put("client.transport.sniff", true);
/**1、判断是否配置了cluster.name
* 2、优先级: trans配置 > kettle.properties
*/
if (Strings.isNullOrEmpty(builder.get("cluster.name"))
&& !Strings.isNullOrEmpty(envUtil.getSystem
Property("es.cluster.name"))) {
builder.put("cluster.name", envUtil.getSystem
Property("es.cluster.name"));
}
Settings settings = builder.build();
TransportClient tClient = new PreBuiltTransport
Client(settings);
/**1、判断是否配置了集群地址
* 2、优先级: trans配置 > kettle.properties
*/
if (meta.getServers().length > 0) {
for (InetSocketTransportAddress address : meta.
getServers()) {
tClient.addTransportAddress(address);
}
} else if (!Strings.isNullOrEmpty(envUtil.getSystem
Property("es.brokes"))&& !Strings.isNullOrEmpty(
envUtil.getSystemProperty("es.port"))) {
for (String addr : envUtil.getSystemProperty(
"es.brokes").split(",")) {
try {//增加多个初始地址 host:port
tClient.addTransportAddress(new InetSocket
TransportAddress(InetAddress.getByName(
addr),Integer.valueOf(envUtil.getSystem
Property("es.port"))));
} catch (NumberFormatException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
client = tClient;
2.5、打包发布
1、打包命令:$ ant resolve package,打包后的压缩包在dist目录下
2、删除老的文件:$KETTLE_HOME/plugins/elasticsearch-bulk-insert-plugin
3、第一步的压缩包解压到第二步的文件夹
三、遗留问题
用kettle做数据从hive抽取到es,hive输入不能并行执行,而且数据的传输压力在kettle服务器上,没有充分利用hadoop集群。可以参考sqoop(),也就是可以用sqoop把数据从hdfs文件抽取到es,这是个hive和es的集成思路。但sqoop是不支持es的,要做es export的开发,下个目标是做es export开发,对比二者性能。
以上是关于Kettle Elasticsearch插件升级的主要内容,如果未能解决你的问题,请参考以下文章
ELK(elasticsearch5.0)head插件安装配置
elasticsearch 6.2 版本升级至6.4 版本服务启动报错