ES实战reindex API的使用

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ES实战reindex API的使用相关的知识,希望对你有一定的参考价值。

Reindex API的使用

可以用来处理大分片和数据迁移,以及索引规整

使用前提

  • 源索引中的所有文档启用_source
  • Reindex 不会尝试设置目标索引。 它不会复制源索引的设置。 您应该在运行_reindex操作之前设置目标索引,包括设置映射、分片计数、副本等。
  • Reindex API 不努力处理 ID 冲突,因此只保留最后写入的文档,但顺序通常不可预测,因此依赖此行为不是一个好主意。 相反,请使用脚本确保 ID 是唯一的。

主要功能

简单功能:

  • 将源索引的文档复制到目标索引

复杂功能:

  • 将源索引的部分字段进行复制到目标索引

  • 脚本修改文档和文档元数据

  • 跨集群复制索引

  • Reindex API 同样支持 refresh, wait_for_completion, wait_for_active_shards, timeout, scroll, and requests_per_second.

使用举例

本集群复制

POST _reindex

  "source": 
    "index": "twitter"
  ,
  "dest": 
    "index": "new_twitter"
  

必要参数:

  • source

    源索引的部分

    • index:索引名称,可以是一个列表,取值方式[“indexName1”,“indexName1”]
  • dest

    目标索引部分

    • index:索引名称

可选参数:

  • version_type

    在dest参数下,取值如下

    • internal:将数据盲目的从源索引写入目标索引,对于相同的type和id,会进行数据覆盖
    • external:代表以源索引中的数据为主,出现冲突是保留源索引的数据。
  • conflicts

    默认情况下,版本冲突会中止 _reindex进程。将这个参数设置成proceed可以继续进程并统计出现版本冲突的次数

  • op_type

    在dest参数中取值:

    • create:代表只向目标索引插入源索引中与目标索引不同的文档,相同的文档会报版本冲突。
  • query

    在source参数下,就是一个查询的语句,对源索引的数据进行删选

  • size

    代表从源索引中取出的数据量,一般与sort参数结合使用。

  • size

    与上面的size不同,这是在source参数下的,代表一批次的数据量,默认值1000。

  • sort

    代表对源索引中的数据进行排序,例如取值 "date": "desc" 使用date字段进行desc排序,一般与size参数结合使用。

将源索引的部分字段进行复制到目标索引

在源索引中增加_source参数可以进行字段的选择

POST _reindex

  "source": 
    "index": "twitter",
    "_source": ["user", "_doc"]
  ,
  "dest": 
    "index": "new_twitter"
  

使用脚本脚本修改文档和文档元数据

POST _reindex

  "source": 
    "index": "twitter"
  ,
  "dest": 
    "index": "new_twitter",
    "version_type": "external"
  ,
  "script": 
    "source": "if (ctx._source.foo == 'bar') ctx._version++; ctx._source.remove('foo')",
    "lang": "painless"
  

ctx可以设置的属性:

  • op

    可以通过op来进行修改对目标索引的操作

    • noop:忽略文档,不对这个索引进行操作
    • delete:删除该文档,从目标索引中将这个文档删除
  • _id

    可以查询满足条件的id,并修改id写入目标索引

  • _type

    可以查询满足条件的type,并修改id写入目标索引

  • _index

    可以使用通过不等式进行匹配

    "script": 
        "lang": "painless",
        "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
      
    
  • _version

    将 _version 设置为 null 或从 ctx 映射中清除它就像不在索引请求中发送版本一样; 无论目标上的版本或您在 _reindex 请求中使用的版本类型如何,它都会导致目标索引中的文档被覆盖。

  • _routing

    默认情况下,如果 _reindex 看到带有路由的文档,则除非脚本更改路由,否则将保留路由。

    • keep:默认值,保留源索引文档上的路由
    • discard:取消路由
    • =<some text>:修改文档的路由为=之后的值,写入目标索引

重新路由目标索引文档

POST _reindex

  "source": 
    "index": "source",
    "size": 100
  ,
  "dest": 
    "index": "dest",
    "routing": "=cat"
  

跨集群复制索引

这是是将A集群中的a1索引复制到B集群的b1索引。那么源集群就是A集群,源索引就是a1,目标集群就是B,目标索引就是b1。因为源集群会向目标集群发送写请求。所以要满足以下前提:

  • 在B集群上的elasticsearch.yaml中需要配置reindex.remote.whitelist属性,值为集群A的HTTP访问地址。
  • 跨集群一次只支持一个reindex任务
reindex.remote.whitelist: "otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"

在B集群(目标集群)执行reindex命令

POST _reindex

    "source": 
        "remote": 
            "host": "http:// $source_cluster_ip:$httpPort",
            "socket_timeout": "60s",
            "connect_timeout": "60s"
        ,
        "index": "a1",
    ,
    "dest": 
        "index": "b1"
    

参数说明

host参数中配置源集群的任意访问地址。

socket_timeoutconnect_timeout的默认值是30s。

修改目标字段名称

将源索引flag字段的内容写入目标索引的tag字段中。

POST _reindex

  "source": 
    "index": "test"
  ,
  "dest": 
    "index": "test2"
  ,
  "script": 
    "source": "ctx._source.tag = ctx._source.remove(\\"flag\\")"
  

分片

Reindex 支持 Sliced Scroll 来并行化重新索引过程。

手动切片

POST _reindex

  "source": 
    "index": "twitter",
    "slice": 
      "id": 0,
      "max": 2
    
  ,
  "dest": 
    "index": "new_twitter"
  

POST _reindex

  "source": 
    "index": "twitter",
    "slice": 
      "id": 1,
      "max": 2
    
  ,
  "dest": 
    "index": "new_twitter"
  

自动切片

默认使用_uid来切分。slices的值最理想应该是主分片的个数,如果该数字很大(例如 500),请选择较小的数字,因为切片过多会影响性能。 设置高于分片数量的切片通常不会提高效率并增加开销。

POST _reindex?slices=5&refresh

  "source": 
    "index": "twitter"
  ,
  "dest": 
    "index": "new_twitter"
  

批量reindex

可以使用shell脚本实现。思路就是批量调用reindex API。

for index in i1 i2 i3 i4 i5; do
  curl -HContent-Type:application/json -XPOST localhost:9200/_reindex?pretty -d'
    "source": 
      "index": "'$index'"
    ,
    "dest": 
      "index": "'$index'-reindexed"
    
  '
done

reindex任务管理

查看任务API

通过这个API可以查询在进行reindex的任务

GET _tasks?detailed=true&actions=*reindex

取消任务API

通过上面查询出来的node和id的组合可以进行任务的取消

POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel

动态限流

和在初次调用reindex API URL中使用一样,requests_per_second ,代表每秒的请求数,默认值为-1,代表不限流,其值可以是任何十进制数。

限流的实现方式,每一个reindex都有单批次的文档数据量(size,默认1000),假设requests_per_second =500,那么一批次的执行的目标时间就是 t a r g e t _ t i m e = s i z e r e q u e s t s _ p e r _ s e c o n d s target\\_time= \\fracsizerequests\\_per\\_second s target_time=requests_per_secondsizes​=1000/500=2,等待时间(集群休息的时间) w a i t _ t i m e = t a r g e t _ t i m e − w r i t e _ t i m e wait\\_time= target\\_time - write\\_time wait_time=target_timewrite_time​​​=2-0.5=1s。那么就会有1s的等待时间,实现了限流降压。

加速查询的重新节流立即生效,但减慢查询的重新节流将在完成当前批处理后生效。 这可以防止滚动超时。

POST _reindex/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

问题总结

问题:为何会导致目标索引的数据少于源索引

  • reindex执行的时候取的是源索引的执行reindex命令的快照,如果源索引没有停写,会出现整体数据量不一致,建议:1、停写源索引 2、按条件分批次执行reindex。

问题:为何执行效率很低,具体原理未查明。

  • 实践表明,一次reindex的数据量过大的话,执行的到后面会越来越慢,建议:控制一次reIndex的大小,一般一批次执行200W-300W的数据量。也可以是用切片的方式,但是要注意集群的压力情况。
  • 当目标索引中拥有数据,会进行Update操作,也会影响执行效率,建议:可以前期先删除目标索引中无效的数据,减少update次数。

源码分析

待补充

以上是关于ES实战reindex API的使用的主要内容,如果未能解决你的问题,请参考以下文章

本地ES集群数据通过_reindex方式迁移到腾讯云服务器(亲测有效)

ES重建索引(reindex)性能优化建议

ES数据库重建索引——Reindex(数据迁移)

es如何修改es索引字段类型 reindex

干货 | Elasticsearch Reindex性能提升10倍+实战(转)

ES索引迁移