Elasticsearch:Ingest pipeline 介绍

Posted Elastic 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:Ingest pipeline 介绍相关的知识,希望对你有一定的参考价值。

Ingest pipeline 可让你在索引之前对数据执行常见转换。 例如,你可以使用 pipeline 删除字段、从文本中提取值并丰富你的数据。

Pipeline 由一系列称为处理器(processors)的可配置任务组成。 每个处理器按顺序运行,对传入文档进行特定更改。 处理器运行后,Elasticsearch 会将转换后的文档添加到您的数据流或索引中。

你可以使用 Kibana 的 Ingest Pipelines 功能或 ingest APIs 创建和管理摄取管道。 Elasticsearch 以集群状态存储管道。 

前提条件

在如下的展示中,我将使用 Elastic Stack 8.3.3 来进行展示,尽管不同的版本的界面可能稍有不同。

创建及管理 pipeline

在 Kibana 中,打开主菜单并单击 Stack Management > Ingest Pipelines。 从列表视图中,你可以:

  • 查看管道列表并深入了解详细信息
  • 编辑或克隆现有管道
  • 删除管道

 

要创建管道,请单击上面的 Create pipeline > New pipeline。 有关示例教程,请参阅示例:ingest pipeline 使用示例 - 解析常用日志格式。 

你还可以使用摄取 API 来创建和管理 pipeline。 以下创建 pipeline API 请求创建一个 pipeline,其中包含两个 set 处理器,后跟一个 lowercase 处理器。 处理器按指定的顺序依次运行。

PUT _ingest/pipeline/my-pipeline

  "description": "My optional pipeline description",
  "processors": [
    
      "set": 
        "description": "My optional processor description",
        "field": "my-long-field",
        "value": 10
      
    ,
    
      "set": 
        "description": "Set 'my-boolean-field' to true",
        "field": "my-boolean-field",
        "value": true
      
    ,
    
      "lowercase": 
        "field": "my-keyword-field"
      
    
  ]

在上面,我们定义了一个 pipeline。它在执行的时候是从上往下依次执行的。它为一个文档添加一个叫做 my-long-field 的字段,添加一个叫做 my-boolean-field 的字段,以及把 my-keyword-field 字段的字母都变为小写字母。我们可以使用如下的例子来进行检验:

PUT my_index/_doc/1?pipeline=my-pipeline

  "my-keyword-field": "Hi, this is Xiaoguo Liu"

在上面,我们创建一个叫做 my_index 的索引,并写入一个文件。我们可以使用如下的命令来检查写入的结果:

GET my_index/_search?filter_path=hits.hits._source

上面的命令返回的结果是:


  "hits": 
    "hits": [
      
        "_source": 
          "my-long-field": 10,
          "my-keyword-field": "hi, this is xiaoguo liu",
          "my-boolean-field": true
        
      
    ]
  

显然,它和我们之前的所述的是一样的结果。我们可以在链接找到更多的 pipeline processors。我们甚至可以使用如下的 API 来获得所有的 pipeline processors:

GET _nodes/ingest?filter_path=nodes.*.ingest.processors

上面的命令显示为:


  "nodes": 
    "EGibleagSe6UJMBgbuPbIA": 
      "ingest": 
        "processors": [
          
            "type": "append"
          ,
          
            "type": "bytes"
          ,
          
            "type": "circle"
          ,
          
            "type": "community_id"
          ,
          
            "type": "convert"
          ,
          
            "type": "csv"
          ,
          
            "type": "date"
          ,
          
            "type": "date_index_name"
          ,
          
            "type": "dissect"
          ,
          ...
        ]
      
    
  

管理 pipeline 版本

创建或更新 pipeline 时,可以指定可选的版本整数。 你可以将此版本号与 if_version 参数一起使用,以有条件地更新 pipeline。 当指定 if_version 参数时,成功的更新会增加 pipeline 的版本。

PUT _ingest/pipeline/my-pipeline-id

  "version": 1,
  "processors": [ ... ]

要使用 API 取消设置版本号,请在不指定版本参数的情况下替换或更新 pipeline。

测试 pipeline

在生产中使用 pipeline 之前,我们建议你i使用示例文档对其进行测试。 在 Kibana 中创建或编辑 pipeline 时,单击添加文档。 在 Documents 选项卡中,提供示例文档并单击 Run the pipeline。

这个在我之前的文章 “Elasticsearch:ingest pipeline 使用示例 - 解析常用日志格式”  有详细的描述。

你还可以使用模拟 pipeline API 测试 pipeline。 你可以在请求路径中指定配置的 pipeline。 例如,以下请求测试。

POST _ingest/pipeline/_simulate

  "pipeline": 
    "processors": [
      
        "grok": 
          "description": "Extract fields from 'message'",
          "field": "message",
          "patterns": [
            """%IPORHOST:source.ip %USER:user.id %USER:user.name \\[%HTTPDATE:@timestamp\\] "%WORD:http.request.method %DATA:url.original HTTP/%NUMBER:http.version" %NUMBER:http.response.status_code:int (?:-|%NUMBER:http.response.body.bytes:int) %QS:http.request.referrer %QS:user_agent"""
          ]
        
      ,
      
        "date": 
          "description": "Format '@timestamp' as 'dd/MMM/yyyy:HH:mm:ss Z'",
          "field": "@timestamp",
          "formats": [
            "dd/MMM/yyyy:HH:mm:ss Z"
          ]
        
      ,
      
        "geoip": 
          "description": "Add 'source.geo' GeoIP data for 'source.ip'",
          "field": "source.ip",
          "target_field": "source.geo"
        
      ,
      
        "user_agent": 
          "description": "Extract fields from 'user_agent'",
          "field": "user_agent"
        
      
    ]
  ,
  "docs": [
    
      "_source": 
        "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \\"GET /favicon.ico HTTP/1.1\\" 200 3638 \\"-\\" \\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/52.0.2743.116 Safari/537.36\\""
      
    
  ]

或者,你可以先创建一个 pipeline:

PUT _ingest/pipeline/common_log_format

  "description": "A pipeline to structure common logs ",
  "processors": [
    
      "grok": 
        "field": "message",
        "patterns": [
          "%IPORHOST:source.ip %USER:user.id %USER:user.name \\\\[%HTTPDATE:@timestamp\\\\] \\"%WORD:http.request.method %DATA:url.original HTTP/%NUMBER:http.version\\" %NUMBER:http.response.status_code:int (?:-|%NUMBER:http.response.body.bytes:int) %QS:http.request.referrer %QS:user_agent"
        ]
      
    ,
    
      "date": 
        "field": "@timestamp",
        "formats": [
          "dd/MMM/yyyy:HH:mm:ss Z"
        ],
        "output_format": "yyyy-MMM-dd'T'HH:mm:ss Z"
      
    ,
    
      "geoip": 
        "field": "source.ip",
        "target_field": "source.geo"
      
    ,
    
      "user_agent": 
        "field": "user_agent"
      
    
  ]

然后,我们再用如下的方法来进行测试:

POST _ingest/pipeline/common_log_format/_simulate

  "description": "A pipeline to structure common logs ",
  "processors": [
    
      "grok": 
        "field": "message",
        "patterns": [
          """%IPORHOST:source.ip %USER:user.id %USER:user.name \\[%HTTPDATE:@timestamp\\] "%WORD:http.request.method %DATA:url.original HTTP/%NUMBER:http.version" %NUMBER:http.response.status_code:int (?:-|%NUMBER:http.response.body.bytes:int) %QS:http.request.referrer %QS:user_agent"""
        ]
      
    ,
    
      "date": 
        "field": "@timestamp",
        "formats": [
          "dd/MMM/yyyy:HH:mm:ss Z"
        ],
        "output_format": "yyyy-MMM-dd'T'HH:mm:ss Z"
      
    ,
    
      "geoip": 
        "field": "source.ip",
        "target_field": "source.geo"
      
    ,
    
      "user_agent": 
        "field": "user_agent"
      
    
  ],
  "docs": [
    
      "_source": 
        "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \\"GET /favicon.ico HTTP/1.1\\" 200 3638 \\"-\\" \\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\\""
      
    
  ]

上面的命令将返回如下的结果:


  "docs": [
    
      "doc": 
        "_index": "_index",
        "_id": "_id",
        "_source": 
          "@timestamp": "2099-May-05T16:21:15 +0000",
          "http": 
            "request": 
              "method": "GET",
              "referrer": "\\"-\\""
            ,
            "version": "1.1",
            "response": 
              "body": 
                "bytes": 3638
              ,
              "status_code": 200
            
          ,
          "source": 
            "geo": 
              "continent_name": "Europe",
              "region_iso_code": "DE-BE",
              "city_name": "Berlin",
              "country_iso_code": "DE",
              "country_name": "Germany",
              "region_name": "Land Berlin",
              "location": 
                "lon": 13.3878,
                "lat": 52.5312
              
            ,
            "ip": "212.87.37.154"
          ,
          "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \\"GET /favicon.ico HTTP/1.1\\" 200 3638 \\"-\\" \\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\\"",
          "user": 
            "name": "-",
            "id": "-"
          ,
          "url": 
            "original": "/favicon.ico"
          ,
          "user_agent": 
            "name": "Chrome",
            "original": "\\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\\"",
            "os": 
              "name": "Mac OS X",
              "version": "10.11.6",
              "full": "Mac OS X 10.11.6"
            ,
            "device": 
              "name": "Mac"
            ,
            "version": "52.0.2743.116"
          
        ,
        "_ingest": 
          "timestamp": "2022-08-10T07:39:53.508450216Z"
        
      
    
  ]

将 pipeline 添加到索引请求

使用 pipeline 查询参数将管道应用于单个或批量索引请求中的文档。我们首先来创建一个 index template。这个 index template 含有 data stream

PUT _index_template/my-data-stream-template

  "index_patterns": [ "my-data-stream*" ],
  "data_stream":  ,
  "priority": 500

我们使用如下的方法来把一些数据写入到一个叫做 my-data-stream 的 data stream 中去:

POST my-data-stream/_doc?pipeline=my-pipeline

  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"


PUT my-data-stream/_bulk?pipeline=my-pipeline
 "create":  
 "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" 
 "create":  
 "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" 

我们可以通过如下的命令来查看写入的文档:

GET my-data-stream/_search?filter_path=**.hits

上面的命令返回的结果为:


  "hits": 
    "hits": [
      
        "_index": ".ds-my-data-stream-2022.08.10-000001",
        "_id": "Pai4hoIBT1laGVGsvcR4",
        "_score": 1,
        "_source": 
          "my-long-field": 10,
          "@timestamp": "2099-03-07T11:04:06.000Z",
          "my-keyword-field": "foo",
          "my-boolean-field": true
        
      ,
      
        "_index": ".ds-my-data-stream-2022.08.10-000001",
        "_id": "Pqi4hoIBT1laGVGsvcR4",
        "_score": 1,
        "_source": 
          "my-long-field": 10,
          "@timestamp": "2099-03-07T11:04:07.000Z",
          "my-keyword-field": "bar",
          "my-boolean-field": true
        
      ,
      
        "_index": ".ds-my-data-stream-2022.08.10-000001",
        "_id": "PKi4hoIBT1laGVGsssSh",
        "_score": 1,
        "_source": 
          "my-long-field": 10,
          "@timestamp": "2099-03-07T11:04:05.000Z",
          "my-keyword-field": "foo",
          "my-boolean-field": true
        
      
    ]
  

你还可以将 pipeline 参数与通过 update_by_queryreindex  API 一起使用。

POST my-data-stream/_update_by_query?pipeline=my-pipeline

POST _reindex

  "source": 
    "index": "my-data-stream"
  ,
  "dest": 
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  

设置默认 pipeline

使用 index.default_pipeline 索引设置来设置默认 pipeline。 如果未指定 pipeline 参数,Elasticsearch 会将此 pipeline 应用于索引请求。比如,当我创建一个如下的索引:

PUT my_index1

  "settings": 
    "index.default_pipeline": "my-pipeline"
  


PUT my_index1/_doc/1

  "my-keyword-field": "FOO"

虽然在上面,在我们创建文档时,我们没有指定任何的 pipeline,但是它在默认的情况下,就自动将 pipeline 应用于该索引。我们使用如下的命令来查看:

GET my_index1/_search?filter_path=**.hits

  "hits": 
    "hits": [
      
        "_index": "my_index1",
        "_id": "1",
        "_score": 1,
        "_source": 
          "my-long-field": 10,
          "my-keyword-field": "foo",
          "my-boolean-field": true
        
      
    ]
  

设置最终 pipeline

使用 index.final_pipeline 索引设置来设置最终 pipeline。 Elasticsearch 在请求或默认 pipeline 之后应用此 pipeline,即使两者都未指定。

PUT _ingest/pipeline/my-final-pipeline

  "description": "Increase my-long-field by 10", 
  "processors": [
    
      "script": 
        "source": """
          ctx['my-long-field'] += 10 
        """
      
    
  ]

在上面,我们创建一个 my-final-pipeline 的 pipeline。它把字段 my-long-field 的值增加 10。能够完成这个操作的前提是 my-long-field 字段已经存在,否则我们的这个 pipeline 就好出问题。现在我们来创建一个叫做 my_index2 的索引:

PUT my_index2

  "settings": 
    "index.default_pipeline": "my-pipeline",
    "index.final_pipeline": "my-final-pipeline"
  

在上面,我们定义了两个 pipeline:默认的及最终的。按照执行顺序来说,默认的先执行,然后才是 final。我们写入如下的文档:

PUT my_index2/_doc/1

  "my-keyword-field": "FOO"

我们通过如下的命令来进行查看:

GET my_index2/_search?filter_path=**.hits

上面的命令显示的结果为:


  "hits": 
    "hits": [
      
        "_index": "my_index2",
        "_id": "1",
        "_score": 1,
        "_source": 
          "my-long-field": 20,
          "my-keyword-field": "foo",
          "my-boolean-field": true
        
      
    ]
  

显然,my-long-field 字段的值现在为 20,而不是之前的 10。

在 Beats 中进行配置

要将摄取 pipeline 添加到 Elastic Beats,请在 <BEAT_NAME>.yml 的 output.elasticsearch 下指定 pipeline 参数。 例如,对于 Filebeat,你可以在 filebeat.yml 中指定管道。

output.elasticsearch:
  hosts: ["localhost:9200"]
  pipeline: my-pipeline

用于 Fleet 和 Elastic Agent 的 pipeline

Fleet 自动为其集成添加摄取 pipeline。 Fleet 使用包含为索引设置默认 pipeline 以及索引模板应用这些 pipeline。 Elasticsearch 根据 datastream 的命名方案将这些模板与你的 Fleet 数据流进行匹配。你可以阅读我之前的文章 “Observability:如何使用 Elastic Agents 把定制的日志摄入到 Elasticsearch 中”。

警告:不要更改 Fleet 的摄取 pipeline 或使用自定义 pipeline 进行 Fleet 集成。 这样做可能会破坏你的 Fleet 数据流。

Fleet 不为 Custom logs 集成提供摄取 pipeline。 你可以通过以下两种方式之一安全地为此集成指定 pipeline:index templatecustom configuraiton

选项一:index template

1)创建并测试你的摄取 pipeline。 将你的 pipeline 命名为 logs-<dataset-name>-default。 这使得跟踪集成 pipeline 变得更加容易。

例如,以下请求为 my-app 数据集创建管道。 管道的名称是 logs-my_app-default。

PUT _ingest/pipeline/logs-my_app-default

  "description": "Pipeline for `my_app` dataset",
  "processors": [ ... ]

2)创建一个索引模板,在 index.default_pipeline 或 index.final_pipeline 索引设置中包含你的 pipeline。 确保模板已启用数据流。 模板的索引模式应该匹配 logs-<dataset-name>-*。

你可以使用 Kibana 的索引管理功能或创建索引模板 API 创建此模板。

例如,以下请求会创建一个匹配 logs-my_app-* 的模板。 该模板使用包含 index.default_pipeline 索引设置的组件模板。

# Creates a component template for index settings
PUT _component_template/logs-my_app-settings

  "template": 
    "settings": 
      "index.default_pipeline": "logs-my_app-default",
      "index.lifecycle.name": "logs"
    
  


# Creates a mapping for index
PUT _component_template/logs-my_app-mappings

  "template": 
    "mappings": 
      "_source": 
        "enabled": false
      ,
      "properties": 
        "host_name": 
          "type": "keyword"
        ,
        "created_at": 
          "type": "date",
          "format": "EEE MMM dd HH:mm:ss Z yyyy"
        
      
    
  


# Creates an index template matching `logs-my_app-*`
PUT _index_template/logs-my_app-template

  "index_patterns": ["logs-my_app-*"],
  "data_stream":  ,
  "priority": 500,
  "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]

3)在 Fleet 中添加或编辑 Custom logs 集成时,单击 Configure integration > Custom log file > Advanced options.。

4)在数据集名称中,指定数据集的名称。 Fleet 会将用于集成的新数据添加到生成的 logs-<dataset-name>-default 数据流中。

例如,如果你的数据集名称是 my_app,Fleet 会将新数据添加到 logs-my_app-default 数据流。

5)使用 rollover API 翻转您的数据流。 这可确保 Elasticsearch 将索引模板及其 pipeline 设置应用于任何新数据以进行集成。

选择二:custom configuration

关于这个配置,我已经在我之前的文章  “Observability:如何使用 Elastic Agents 把定制的日志摄入到 Elasticsearch 中” 做了详尽的描述。这里就不再重复了。

Elastic Agent standalone

如果你独立运行 Elastic Agent,则可以使用包含 index.default_pipeline 或 index.final_pipeline 索引设置的索引模板应用管道。 或者,你可以在 elastic-agent.yml 配置中指定 pipeline 策略设置。 请参阅安装独立的 Elastic Agent

访问处理器中的源字段

处理器对传入文档的源字段具有读写访问权限。 要访问处理器中的字段键,请使用其字段名称。 以下 set 处理器访问 my-long-field。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "set": 
        "field": "my-long-field",
        "value": 10
      
    
  ]

你还可以添加 _source 前缀。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "set": 
        "field": "_source.my-long-field",
        "value": 10
      
    
  ]

使用点表示法访问对象字段。

重要:如果你的文档包含展平对象,请先使用 dot_expander 处理器展开它们。 其他摄取处理器无法访问展平对象。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "dot_expander": 
        "description": "Expand 'my-object-field.my-property'",
        "field": "my-object-field.my-property"
      
    ,
    
      "set": 
        "description": "Set 'my-object-field.my-property' to 10",
        "field": "my-object-field.my-property",
        "value": 10
      
    
  ]

比如,我们有如下的文档:

PUT flattened_obj/_doc/1?pipeline=my-pipeline

  "my-object-field.my-property": 100

最终写入的文档为:

GET flattened_obj/_search?filter_path=**.hits

  "hits": 
    "hits": [
      
        "_index": "flattened_obj",
        "_id": "1",
        "_score": 1,
        "_source": 
          "my-object-field": 
            "my-property": 10
          
        
      
    ]
  

我们可以看出来 my-property 是 my-object-field 里的一个子字段。

有几个处理器参数支持 Mustache 模板片段。 要访问模板片段中的字段值,请将字段名称括在三个大括号中:field-name。 你可以使用模板片段来动态设置字段名称。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "set": 
        "description": "Set dynamic '<service>' field to 'code' value",
        "field": "service",
        "value": "code"
      
    
  ]

在处理器中访问摄取元数据

摄取处理器可以使用 _ingest 键添加和访问摄取元数据。

与源和元数据字段不同,Elasticsearch 默认不索引摄取元数据字段。 Elasticsearch 还允许以 _ingest 键开头的源字段。 如果你的数据包含此类源字段,请使用 _source._ingest 访问它们。

默认情况下,pipelie 仅创建 _ingest.timestamp 摄取元数据字段。 该字段包含 Elasticsearch 收到文档索引请求的时间戳。 要索引 _ingest.timestamp 或其他摄取元数据字段,请使用 set 处理器。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "set": 
        "description": "Index the ingest timestamp as 'event.ingested'",
        "field": "event.ingested",
        "value": "_ingest.timestamp"
      
    
  ]

处理 pipeline 故障

Pipeline 的处理器按顺序运行。 默认情况下,当这些处理器之一发生故障或遇到错误时,pipeline 处理将停止。

要忽略处理器故障并运行管道的剩余处理器,请将 ignore_failure 设置为 true。详细阅读请参阅之前的文章 “Elasticsearch:如何处理 ingest pipeline 中的异常”。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "rename": 
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      
    
  ]

使用 on_failure 参数指定在处理器发生故障后立即运行的处理器列表。 如果指定了 on_failure,即使 on_failure 配置为空,Elasticsearch 也会随后运行 pipeline 的剩余处理器。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "rename": 
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          
            "set": 
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            
          
        ]
      
    
  ]

嵌套用于嵌套错误处理的 on_failure 处理器列表。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "rename": 
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          
            "set": 
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false,
              "on_failure": [
                
                  "set": 
                    "description": "Set 'error.message.multi'",
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  
                
              ]
            
          
        ]
      
    
  ]

你还可以为 pipeline 指定 on_failure。 如果没有 on_failure 值的处理器出现故障,Elasticsearch 会使用此 pipeline 级参数作为备用参数。 Elasticsearch 不会尝试运行 pipeline 的剩余处理器。

PUT _ingest/pipeline/my-pipeline

  "processors": [ ... ],
  "on_failure": [
    
      "set": 
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed- _index "
      
    
  ]

有关 pipeline 故障的其他信息可能在文档元数据字段 on_failure_message、on_failure_processor_type、on_failure_processor_tag 和 on_failure_pipeline 中可用。 这些字段只能从 on_failure 块中访问。

以下示例使用元数据字段在文档中包含有关管道故障的信息。

PUT _ingest/pipeline/my-pipeline

  "processors": [ ... ],
  "on_failure": [
    
      "set": 
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor  _ingest.on_failure_processor_type  with tag  _ingest.on_failure_processor_tag  in pipeline  _ingest.on_failure_pipeline  failed with message  _ingest.on_failure_message "
      
    
  ]

有条件地运行处理器

每个处理器都支持可选的 if 条件,编写为 Painless 脚本。 如果提供,则处理器仅在 if 条件为真时运行。

重要:if 条件脚本在 Painless 的摄取处理器上下文中运行。 在 if 条件下,ctx 值是只读的。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "drop": 
        "description": "Drop documents with 'network.name' of 'Guest'",
        "if": "ctx?.network?.name == 'Guest'"
      
    
  ]

如果启用了 script.painless.regex.enabled 集群设置,你可以在 if 条件脚本中使用正则表达式。 有关支持的语法,请参阅 Painless 正则表达式

提示:如果可能,请避免使用正则表达式。 昂贵的正则表达式会降低索引速度。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "set": 
        "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
        "if": "ctx.url?.scheme =~ /^http[^s]/",
        "field": "url.insecure",
        "value": true
      
    
  ]

你必须在一行中将 if 条件指定为有效 JSON。 但是,您可以使用 Kibana 控制台的三引号语法来编写和调试更大的脚本。

提示:如果可能,请避免使用复杂或昂贵的 if 条件脚本。 昂贵的条件脚本会降低索引速度。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "drop": 
        "description": "Drop documents that don't contain 'prod' tag",
        "if": """
            Collection tags = ctx.tags;
            if(tags != null)
              for (String tag : tags) 
                if (tag.toLowerCase().contains('prod')) 
                  return false;
                
              
            
            return true;
        """
      
    
  ]

你还可以将存储的脚本指定为 if 条件。

PUT _scripts/my-prod-tag-script

  "script": 
    "lang": "painless",
    "source": """
      Collection tags = ctx.tags;
      if(tags != null)
        for (String tag : tags) 
          if (tag.toLowerCase().contains('prod')) 
            return false;
          
        
      
      return true;
    """
  


PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "drop": 
        "description": "Drop documents that don't contain 'prod' tag",
        "if":  "id": "my-prod-tag-script" 
      
    
  ]

传入文档通常包含对象字段。 如果处理器脚本尝试访问其父对象不存在的字段,Elasticsearch 将返回 NullPointerException。 要避免这些异常,请使用 null 安全运算符,例如 ?.,并将脚本编写为 null 安全的。

例如, ctx.network?.name.equalsIgnoreCase('Guest') 不是 null 安全的。 ctx.network?.name 可以返回 null。 将脚本重写为 'Guest'.equalsIgnoreCase(ctx.network?.name),这是 null 安全的,因为 Guest 始终为非 null。

如果你无法将脚本重写为空安全,请包含显式 null 检查。

PUT _ingest/pipeline/my-pipeline

  "processors": [
    
      "drop": 
        "description": "Drop documents that contain 'network.name' of 'Guest'",
        "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
      
    
  ]

有条件地应用管道

将 if 条件与管道处理器相结合,以根据你的标准将其他管道应用于文档。 你可以将此管道用作用于配置多个数据流或索引的索引模板中的默认管道。

PUT _ingest/pipeline/one-pipeline-to-rule-them-all

  "processors": [
    
      "pipeline": 
        "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      
    ,
    
      "pipeline": 
        "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      
    ,
    
      "fail": 
        "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      
    
  ]

获取管道使用统计信息

使用 node stats API 获取全局和每个 pipeline 的摄取统计信息。 使用这些统计信息来确定哪些 pipeline 运行最频繁或花费最多时间处理。

GET _nodes/stats/ingest?filter_path=nodes.*.ingest

  "nodes": 
    "EGibleagSe6UJMBgbuPbIA": 
      "ingest": 
        "total": 
          "count": 17,
          "time_in_millis": 7,
          "current": 0,
          "failed": 2
        ,
        "pipelines": 
          "sample-pipeline": 
            "count": 0,
            "time_in_millis": 0,
            "current": 0,
            "failed": 0,
            "processors": []
          ,
          "common_log_format": 
            "count": 2,
            "time_in_millis": 32,
            "current": 0,
            "failed": 0,
            "processors": [
              
                "grok": 
                  "type": "grok",
                  "stats": 
                    "count": 2,
                    "time_in_millis": 6,
                    "current": 0,
                    "failed": 0
                  
                
              ,
              
                "date": 
                  "type": "date",
                  "stats": 
                    "count": 2,
                    "time_in_millis": 3,
                    "current": 0,
                    "failed": 0
                  
                
              ,
        ...

以上是关于Elasticsearch:Ingest pipeline 介绍的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch 安装Ingest User-Agent插件(ingest-user-agent)

Elasticsearch 集群的哪个节点(master、data、ingest)从 logstash 收集数据?

Elasticsearch:Ingest Pipeline 实践

Elasticsearch:Ingest Pipeline 实践

Elasticsearch:Ingest pipeline 介绍

Elasticsearch的ETL利器——Ingest节点