Kettle Elasticsearch插件升级

Posted data之道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kettle Elasticsearch插件升级相关的知识,希望对你有一定的参考价值。

  • 一、背景

  • 二、升级步骤

    • 2.1 源码导入

    • 2.2 版本号修改

    • 2.3 ES Client对比

    • 2.4 代码修改

    • 2.5 打包发布

  • 三、遗留问题

一、背景

    项目中有对大数据量查询的需求,数据在hive上加工,ES作为对底层查询的支持。这样的体系结构涉及到hive数据迁移到ES的过程,计划用kettle的ES批量加载插件加载数据到ES,在实际开发中发现kettle连不上ES集群。

    我们ES是5.5.1版本,而kettle默认支持的版本较低;Java Client连接ES,Client的主版本号要和ES集群主版本号保持一致,否则有可能连不上集群、或者即使连上也不支持新功能,所以要对ES插件进行升级。

二、升级步骤

    2.1、源码导入

    下载对应kettle版本源码、导入项目到IDE,项目路径:plugins/elasticsearch-bulk-insertx。8.0以后版本用maven管理,之前是ant,我们用的是6.0版本,下面的配置就以ant为准。(有下载源码的例子)

    2.2、版本号修改

    打开ivy.xml,看到es版本是1.5.2

增加对5.5.1版本支持:

    2.3、ES Client对比

    java client有两种方法连接到ES集群:Transport Client和Node Client,两者实现原理有很大不同。

    Node Client会加入集群,但不存储数据、也就不能成为主节点。因本身是集群的一个节点,它知道整个集群的状态(所有的节点在哪、哪个分片在哪个节点上等),这意味着执行api时会少一层网络跳跃。

    如果想要应用和集群分离、应用经常快速创建和销毁连接,那么Transport Client是理想选择,集群管理节点的增加和删除也是有开销的,如果频繁进行节点变动,会降低集群性能;如果只需要几个长连接、持久连接到集群的对象,那么node client是一个更好的选择,因为它知道集群的结构。

    kettle在每次开始执行trans时才创建client连接、结束时要销毁连接,所以TC相对会更高效些。

    2.4、代码修改

EnvUtil工具类可以获得kettle.properties配置文件中的配置项。

Builder builder = Settings.builder();
builder.put(meta.getSettings());
//开启自动检查机器节点(嗅探功能)
builder.put("client.transport.sniff", true);

/**1、判断是否配置了cluster.name
* 2、优先级: trans配置 > kettle.properties
*/
if (Strings.isNullOrEmpty(builder.get("cluster.name"))
&& !Strings.isNullOrEmpty(envUtil.getSystem
       Property
("es.cluster.name"))) {
builder.put("cluster.name", envUtil.getSystem
       Property
("es.cluster.name"));
}
Settings settings = builder.build();
TransportClient tClient = new PreBuiltTransport
Client(settings);

/**1、判断是否配置了集群地址
* 2、优先级: trans配置 > kettle.properties
*/
if (meta.getServers().length > 0) {
for (InetSocketTransportAddress address : meta.
getServers()) {
tClient.addTransportAddress(address);
   }
} else if (!Strings.isNullOrEmpty(envUtil.getSystem
Property
("es.brokes"))&& !Strings.isNullOrEmpty(
envUtil.getSystemProperty("es.port"))) {
for (String addr : envUtil.getSystemProperty(
"es.brokes").split(",")) {
try {//增加多个初始地址 host:port
tClient.addTransportAddress(new InetSocket
TransportAddress(InetAddress.getByName(
addr),Integer.valueOf(envUtil.getSystem
               Property
("es.port"))));
       } catch (NumberFormatException e) {
           e.printStackTrace();
       } catch (UnknownHostException e) {
           e.printStackTrace();
       }
}
}
client = tClient;

    2.5、打包发布

      1、打包命令:$ ant resolve  package,打包后的压缩包在dist目录下

     2、删除老的文件:$KETTLE_HOME/plugins/elasticsearch-bulk-insert-plugin

     3、第一步的压缩包解压到第二步的文件夹

三、遗留问题

    用kettle做数据从hive抽取到es,hive输入不能并行执行,而且数据的传输压力在kettle服务器上,没有充分利用hadoop集群。可以参考sqoop(),也就是可以用sqoop把数据从hdfs文件抽取到es,这是个hive和es的集成思路。但sqoop是不支持es的,要做es export的开发,下个目标是做es export开发,对比二者性能。


以上是关于Kettle Elasticsearch插件升级的主要内容,如果未能解决你的问题,请参考以下文章

ELK(elasticsearch5.0)head插件安装配置

kibana 6.2.4 升级 6.4.2

elasticsearch 6.2 版本升级至6.4 版本服务启动报错

Kettle java脚本组件的使用说明(简单使用升级使用)

Elasticsearch 5.0

kettle处理未发现hadoop插件问题