在ES批量插入数据超时时自动重试
Posted tjp40922
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在ES批量插入数据超时时自动重试相关的知识,希望对你有一定的参考价值。
当我们使用ES批量插入数据的时候,一般会这样写代码:
from elasticsearch import Elasticsearch,helpers es =Elasticsearch(hosts=[{‘host‘:‘localhost‘,‘port‘:9200}]) def gendata(): mywords =[‘foo‘,‘bar‘,‘baz‘] for word in mywords: yield {"_index":"mywords","_type":"document","_type":"document","doc":{"word": word}} helpersbulk(es,gendata())
但当ES的负荷过大时,这种写法可能会抛出连接超时的异常。
为了解决这个问题,在初始化ES连接对象时,可以设置一个更大的超时时间:
es = Elasticsearch(hosts=[{‘host‘: ‘localhost‘, ‘port‘: 9200}], timeout=60)
但有时候,即时设置为60秒还是有可能遇到超时异常,但这个超时时间并非越大越好,所以最好能够让ES在遇到超时的情况下自动重试。
在创建ES连接对象时,还可以再加两个参数,实现超时自动重试3次:
es = Elasticsearch(hosts=[{‘host‘: ‘localhost‘, ‘port‘: 9200}], timeout=60, max_retries=3, retry_on_timeout=True)
通过添加 max_retries
和 retry_on_timeout
两个参数,就能实现超时自动重试了。
如果你直接看ES的文档,你可能会找不到这两个参数,如下图所示。
这并非是ES的文档有问题,而是因为这两个参数隐藏在 **kwargs
里面,如下图所示。
点进 Transport
就可以看到这两个参数:
以上是关于在ES批量插入数据超时时自动重试的主要内容,如果未能解决你的问题,请参考以下文章
Spark向Elasticsearch批量导入数据,出现重复的问题定位
java sqlserver 怎么做当某一笔数据超时时,自动更新该笔数据状态?