python中的Elasticsearch parallel_bulk辅助函数在使用响应时会因部分失败而引发错误

Posted

技术标签:

【中文标题】python中的Elasticsearch parallel_bulk辅助函数在使用响应时会因部分失败而引发错误【英文标题】:Elasticsearch parallel_bulk helper function in python is throwing error for a partial failure when consuming response 【发布时间】:2022-01-09 15:57:28 【问题描述】:

我正在尝试在 python 中使用 elasticsearch parallel_bulk 函数测试部分失败场景。 我有一个具有以下映射的索引

PUT test

  "mappings": 
    "dynamic": "strict",
    "properties": 
      "random": 
        "type": "keyword"
      
    
  

我编写了下面的代码,我尝试使用一个具有不在映射中的额外字段的文档进行批量插入,并且由于映射是严格的,因此该文档的插入应该失败

from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

def index_bulk(payload: list, index_name: str):
    for data in payload:
        yield 
            '_op_type': 'index',
            '_index': index_name,
            '_source': data
        

conn_params = 
    'hosts': ['host': 'localhost', 'port': 9200],

es = Elasticsearch(**conn_params)
docs = [ "random": f"value_i"  for i in range(3)]
docs.append(
    "random": "value",
    "field": "test"
)

response = parallel_bulk(
    client=es,
    actions=index_bulk(docs, 'test')
)

上面的代码可以正常工作并且不会抛出任何错误。但是现在我尝试使用响应生成器来查看使用以下代码是否有任何故障

for success, info in response:
    if not success: print('Doc failed', info)

而不是打印失败的文档,而是在下面抛出错误

Traceback (most recent call last):
  File "test_bulk_es.py", line 28, in <module>
    for success, info in response:
  File "C:\Users\username\project\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 365, in parallel_bulk 
    actions, chunk_size, max_chunk_bytes, client.transport.serializer
  File "C:\Users\username\AppData\Local\Programs\Python\Python36\Lib\multiprocessing\pool.py", line 735, in next
    raise value
  File "C:\Users\username\AppData\Local\Programs\Python\Python36\Lib\multiprocessing\pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "C:\Users\username\project\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 361, in <lambda>      
    client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs
  File "C:\Users\username\project\venv\lib\site-packages\elasticsearch\helpers\actions.py", line 158, in _process_bulk_chunk
    raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
elasticsearch.helpers.errors.BulkIndexError: ('1 document(s) failed to index.', ['index': '_index': 'test', '_type': '_doc', '_id': 'km93f30Ba79etmZE_XiS', 'status': 400, 'error': 'type': 'strict_dynamic_mapping_exception', 'reason': 'mapping set to strict, dynamic introduction of [field] within [_doc] is not allowed', 'data': 'random': 'value', 'field': 'test'])

我不希望它抛出任何错误。我只是想知道是否有任何部分失败,我假设这就是我在documentation中读到的上述代码的工作原理@

我是不是做错了什么?

我正在使用elasticsearch==7.1.0

【问题讨论】:

【参考方案1】:

您应该在False 上设置raise_on errorraise_on_exception,如documentation 中所述。在这种情况下会引发任何问题,因此您只能通过以下方式检查故障:

for success, info in helpers.parallel_bulk(
    client=es,
    actions=index_bulk(docs, 'test'),
    raise_on_error=False,
    raise_on_exception=False):
    if not success:
         print(info)

【讨论】:

以上是关于python中的Elasticsearch parallel_bulk辅助函数在使用响应时会因部分失败而引发错误的主要内容,如果未能解决你的问题,请参考以下文章

通过 DataFrame 将数据导入到 ElasticSearch。

如何使用 python 编辑保存在 Elasticsearch 中的文档

python 随机par ou impar(功能示例)

python 随机par ou impar(功能示例)

Elasticsearch:Anaylyzer 在 Python 中的运用

Elasticsearch:Analyzer 在 Python 中的运用