Elasticsearch:从零开始创建一个 ingest pipeline 处理器

Posted Elastic 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:从零开始创建一个 ingest pipeline 处理器相关的知识,希望对你有一定的参考价值。

实际上在我之前的文章:

我已经详述了如和使用一些工具来生产相应的最基本的 ingest pipeline 的处理器。在今天的文章中,我进一步来通过一个例子来进行展示。在今天的展示中,我将使用最新的  Elastic Stack 8.4.0 来进行展示。我们将设计一个叫做 sample 的处理器。它可以把文档中的一个字段的首字母进行提取,转换为小写字母,并置于一个用户自己设定的字段中:

安装

如果你还没有安装好自己的 Elastic Stack,请参考如下的文章来安装 Elasticsearch 及 Kibana:

创建插件模板

就像在之前的文章 “Elasticsearch:创建一个 Elasticsearch Ingest 插件” 所描述的那样,我们可以使用   elasticsearch-plugin-archtype 插件来生产。我们使用如下的命令来创建一个最为基本的插件模板:

mvn archetype:generate \\
    -DarchetypeGroupId=org.codelibs \\
    -DarchetypeArtifactId=elasticsearch-plugin-archetype \\
    -DarchetypeVersion=6.6.0 \\
    -DgroupId=com.liuxg \\
    -DartifactId=elasticsearch-plugin \\
    -Dversion=1.0-SNAPSHOT \\
    -DpluginName=ingest 

 上面已经帮我们创建了一个最为基本的插件模板。它在当前的目录下创建了一个叫做 elasticsearch-plugin 的目录。我们首先进入到该目录中:

$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ tree -L 8
.
├── pom.xml
└── src
    └── main
        ├── assemblies
        │   └── plugin.xml
        ├── java
        │   └── com
        │       └── liuxg
        │           ├── ingestPlugin.java
        │           └── rest
        │               └── RestingestAction.java
        └── plugin-metadata
            └── plugin-descriptor.properties

上面的命令为我们生成了一个最为基本的 REST handler 的插件架构。它不是我们所需要的,我们需要对文件进行重新命名,并对文件的目录进行调整。调整后的文件架构如下:

$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ tree -L 8
.
├── pom.xml
└── src
    └── main
        ├── assemblies
        │   └── plugin.xml
        ├── java
        │   └── com
        │       └── liuxg
        │           ├── ingest
        │           │   └── SampleProcessor.java
        │           └── plugin
        │               └── ingest
        │                   └── ingestPlugin.java
        └── plugin-metadata
            └── plugin-descriptor.properties

我们接下来修改 pom.xml:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<name>elasticsearch-plugin</name>
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.liuxg</groupId>
	<artifactId>elasticsearch-plugin</artifactId>
	<version>1.0-SNAPSHOT</version>
	<packaging>jar</packaging>
	<description>elasticsearch ingest plugin</description>
	<inceptionYear>2019</inceptionYear>
	<licenses>
		<license>
			<name>The Apache Software License, Version 2.0</name>
			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
			<distribution>repo</distribution>
		</license>
	</licenses>
	<properties>
		<elasticsearch.version>8.4.0</elasticsearch.version>
		<elasticsearch.plugin.classname>com.liuxg.plugin.ingest.ingestPlugin</elasticsearch.plugin.classname>
		<log4j.version>2.11.1</log4j.version>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>
	<build>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.0</version>
				<configuration>
					<source>$maven.compiler.source</source>
					<target>$maven.compiler.target</target>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-surefire-plugin</artifactId>
				<version>2.22.1</version>
				<configuration>
					<includes>
						<include>**/*Tests.java</include>
					</includes>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-source-plugin</artifactId>
				<version>3.0.1</version>
				<executions>
					<execution>
						<id>attach-sources</id>
						<goals>
							<goal>jar</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.1.0</version>
				<configuration>
					<appendAssemblyId>false</appendAssemblyId>
					<outputDirectory>$project.build.directory/releases/</outputDirectory>
					<descriptors>
						<descriptor>$basedir/src/main/assemblies/plugin.xml</descriptor>
					</descriptors>
				</configuration>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
	<dependencies>
		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>$elasticsearch.version</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>$log4j.version</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>
</project>

在上面,我们对如下的两行做了修改:

		<elasticsearch.version>8.4.0</elasticsearch.version>
		<elasticsearch.plugin.classname>com.liuxg.plugin.ingest.ingestPlugin</elasticsearch.plugin.classname>

我们需要把 elasticsearch.version 设置为和 Elastic Stack 一样的版本才可以得到安装。另外,我们也必须修改 elasticsearch.plugin.classname,这是因为我们的文件路径发生变化了。

紧接着,我们来修改 ingestPlugin.java 文件:

ingestPlugin.java

package com.liuxg.plugin.ingest;

import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import com.liuxg.ingest.SampleProcessor;

import java.util.Collections;
import java.util.Map;

public class ingestPlugin extends Plugin implements IngestPlugin 
    @Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) 
        return Collections.singletonMap(SampleProcessor.TYPE, new SampleProcessor.Factory());
    

上面的代码是用来这次这个 ingest pipeline 的插件的。

再接着下来,我们修改 SampleProcessor.java 文件:

SampleProcessor.java

package com.liuxg.ingest;

import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Locale;
import java.util.Map;

public final class SampleProcessor extends AbstractProcessor 

    public static final String TYPE = "sample";

    private final String field;
    private final String targetField;
    private final String defaultValue;
    private final boolean ignoreMissing;

    public SampleProcessor(String tag, String description, String field, String targetField, boolean ignoreMissing, String defaultValue) 
        super(tag, description);
        this.field = field;
        this.targetField = targetField;
        this.ignoreMissing = ignoreMissing;
        this.defaultValue = defaultValue;
    

    String getField() 
        return field;
    

    String getTargetField() 
        return targetField;
    

    String getDefaultField() 
        return defaultValue;
    

    boolean isIgnoreMissing() 
        return ignoreMissing;
    

    @Override
    public IngestDocument execute(IngestDocument document) 
        if (!document.hasField(field, true)) 
            if (ignoreMissing) 
                return document;
             else 
                throw new IllegalArgumentException("field [" + field + "] not present as part of path [" + field + "]");
            
        
        // We fail here if the target field point to an array slot that is out of range.
        // If we didn't do this then we would fail if we set the value in the target_field
        // and then on failure processors would not see that value we tried to rename as we already
        // removed it.
        if (document.hasField(targetField, true)) 
            throw new IllegalArgumentException("field [" + targetField + "] already exists");
        

        Object value = document.getFieldValue(field, Object.class);
        if( value!=null && value instanceof String ) 
            String myValue=value.toString().trim();
            if(myValue.length()>1)
                try 
                    document.setFieldValue(targetField, myValue.substring(0,1).toLowerCase(Locale.getDefault()));
                 catch (Exception e) 
                    // setting the value back to the original field shouldn't as we just fetched it from that field:
                    document.setFieldValue(field, value);
                    throw e;
                
            
        
        return document;
    

    @Override
    public String getType() 
        return TYPE;
    

    public static final class Factory implements Processor.Factory 
        @Override
        public Processor create(Map<String, Processor.Factory> processorFactories, String tag, String description, Map<String, Object> config) throws Exception 
            String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
            String targetField = ConfigurationUtils.readStringProperty(TYPE, tag,
                    config, "target_field");
            String defaultValue = ConfigurationUtils.readOptionalStringProperty(TYPE, tag,
                    config, "defaultValue");
            boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag,
                    config, "ignore_missing", false);
            return new SampleProcessor(tag, description, field, targetField, ignoreMissing, defaultValue);
        
    

在上面,我们定义了该处理器的名字为 sample。我们将在下面的测试中进行使用。上面的实现使得我们提取一个字段的首字母,并放置于一个自定义的字段中去。我们将在下面的测试中进行展示。

编译

我们在项目的根目录下使人如下的命令来进行编译:

mvn clean install
$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ mvn clean install
[INFO] Scanning for projects...
[INFO] 
[INFO] -------------------< com.liuxg:elasticsearch-plugin >-------------------
[INFO] Building elasticsearch-plugin 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ elasticsearch-plugin ---
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ elasticsearch-plugin ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ elasticsearch-plugin ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /Users/liuxg/java/plugins/elasticsearch-plugin/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ elasticsearch-plugin ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/test/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ elasticsearch-plugin ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-surefire-plugin:2.22.1:test (default-test) @ elasticsearch-plugin ---
[INFO] No tests to run.
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ elasticsearch-plugin ---
[INFO] Building jar: /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT.jar
[INFO] 
[INFO] >>> maven-source-plugin:3.0.1:jar (attach-sources) > generate-sources @ elasticsearch-plugin >>>
[INFO] 
[INFO] <<< maven-source-plugin:3.0.1:jar (attach-sources) < generate-sources @ elasticsearch-plugin <<<
[INFO] 
[INFO] 
[INFO] --- maven-source-plugin:3.0.1:jar (attach-sources) @ elasticsearch-plugin ---
[INFO] Building jar: /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar
[INFO] 
[INFO] --- maven-assembly-plugin:3.1.0:single (default) @ elasticsearch-plugin ---
[INFO] Reading assembly descriptor: /Users/liuxg/java/plugins/elasticsearch-plugin/src/main/assemblies/plugin.xml
[WARNING] The following patterns were never triggered in this artifact exclusion filter:
o  'org.elasticsearch:elasticsearch'

[INFO] Building zip: /Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
[INFO] 
[INFO] --- maven-install-plugin:2.4:install (default-install) @ elasticsearch-plugin ---
[INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT.jar to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.jar
[INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/pom.xml to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.pom
[INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar
[INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.zip
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  5.213 s
[INFO] Finished at: 2022-09-08T11:50:02+08:00
[INFO] ------------------------------------------------------------------------

编译成功后,我们可以在 target 目录先看到如下的安装文件:

$ pwd
/Users/liuxg/java/plugins/elasticsearch-plugin
$ ls target/releases/
elasticsearch-plugin-1.0-SNAPSHOT.zip

上面显示的 elasticsearch-plugin-1.0-SNAPSHOT.zip 就是我们可以安装的插件文件。

安装插件并测试插件

我们接下来换到 Elasticsearch 的安装目录下,并打入如下的命令:

$ pwd
/Users/liuxg/elastic0/elasticsearch-8.4.0
$ bin/elasticsearch-plugin install file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
-> Installing file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
-> Downloading file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
[=================================================] 100%   
-> Installed ingest
-> Please restart Elasticsearch to activate any plugins installed
$ ./bin/elasticsearch-plugin list
ingest

从上面的显示中,我们可以看出来 ingest 插件已经被成功地安装。我们接下来需要重新启动 Elasticsearch。这个非常重要!

等 Elasticsearch 重新启动后,我们打开 Kibana,并使用如下的命令来进行测试:

POST _ingest/pipeline/_simulate

  "pipeline": 
    "description": "This is a test for my custom pipeline",
    "processors": [
      
        "sample": 
          "field": "user",
          "target_field": "user_initial"
        
      
    ]
  ,
  "docs": [
    
      "_source": 
        "user": "xiaoguo"
      
    ,
    
      "_source": 
        "user": "liu"
      
    
  ]

在上面的测试中,我们的字段 user 的值分别为 xiaoguo 及 Liu。经过我们的 sample 处理器后, 结果如下:


  "docs": [
    
      "doc": 
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": 
          "user_initial": "x",
          "user": "xiaoguo"
        ,
        "_ingest": 
          "timestamp": "2022-09-08T04:00:48.922489Z"
        
      
    ,
    
      "doc": 
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": 
          "user_initial": "l",
          "user": "Liu"
        ,
        "_ingest": 
          "timestamp": "2022-09-08T04:00:48.922514Z"
        
      
    
  ]

显然第一个字母被成功提取,并且把它转换为小写字母。最终把这个字母置于我们设定的 user_initial 字段中去。

为了方便大家的学习,我把最终的代码置于仓库:https://github.com/liu-xiao-guo/es-ingest-pipeline

以上是关于Elasticsearch:从零开始创建一个 ingest pipeline 处理器的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch:从零开始构建一个定制的分词器

Elasticsearch从零开始搭建ES集群并且集成到Springboot,更好的服务电商类等需要全文索引的项目

(十六)从零开始搭建k8s集群——使用KubeSphere管理平台搭建一个高可用的ElasticSearch服务平台

Elasticsearch:从零开始到搜索 - 使用 Elasticsearch 摄取管道玩转你的数据

centos从零开始安装elasticSearch

Elasticsearch入门之从零开始安装ik分词器