ElasticSearch - 并行调用 UpdateByQuery 和 Update 导致 409 冲突

Posted

技术标签:

【中文标题】ElasticSearch - 并行调用 UpdateByQuery 和 Update 导致 409 冲突【英文标题】:ElasticSearch - calling UpdateByQuery and Update in parallel causes 409 conflicts 【发布时间】:2022-01-18 13:55:52 【问题描述】:

使用大型索引(100,000 个文档),我有一个用例会产生多个尝试并行更新文档的线程,源代码使用两种方法来更新文档:UpdateUpdateByQuery,所以一些线程调用Update,其中一些调用UpdateByQuery。 为简洁起见,每个线程都尝试更新整个文档的相同属性。

这是一个演示用例的小型 POC:

索引 100,000 个 Product 类型的文档,并生成 100 个任务,因此每个任务并行调用 UpdateUpdateByQuery。他们都使用MatchAll查询。

public async Task ConflictsTestAsync()

    // index 100,000 documents.
    IEnumerable<Product> products = CreateProducts(100000);
    await _client.IndexManyAsync(products);

    await _client.Indices.UpdateSettingsAsync("MyIndex", s => s
        .IndexSettings(i => i.Setting(UpdatableIndexSettings.MaxResultWindow, 100000)));

    var searchResponse = _client.Search<Product>(s => s
        .From(0)
        .Size(100000)
        .Query(q => q.MatchAll())
    );
    IReadOnlyCollection<IHit<Product>> getProducts = searchResponse.Hits;

    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 100; i++)
    
        tasks.Add(UpdateByQuery(getProducts));
        tasks.Add(Update(getProducts));
    

    await Task.WhenAll(tasks);


public class Product

    public string Price  get; set; 
    public string Id  get; set; 

UpdateByQuery(将价格更新为 0):

private async Task UpdateByQuery()

    Func<UpdateByQueryDescriptor<Product>, IUpdateByQueryRequest> updateByQuerySelector = (UpdateByQueryDescriptor<Product> updateByQueryDescriptor) =>
    
        updateByQueryDescriptor
            .Conflicts(Conflicts.Abort)
            .ErrorTrace()
            .Query(x => x.MatchAll())
            .Script(x => x.Source("ctx._source['price'] = '0'"));               

        IUpdateByQueryRequest result = updateByQueryDescriptor;
        return result;
    ;

    await _client.UpdateByQueryAsync(updateByQuerySelector, CancellationToken.None);

更新(更新价格为1):

private async Task Update(IReadOnlyCollection<IHit<Product>> keys)

    foreach (IHit<Product> product in keys)
    
        DocumentPath<Product> id = new DocumentPath<Product>(product.Id);
        Func<UpdateDescriptor<Product, Product>, IUpdateRequest<Product, Product>> updateSelector = (UpdateDescriptor<Product, Product> updateDescriptor) =>
        
            var page = product.Source;
            page.Price = "1";

            updateDescriptor.Doc(page);

            IUpdateRequest<Product, Product> result = updateDescriptor;

            return result;
        ;

        await _client.UpdateAsync<Product>(id, updateSelector);
    

问题是产生多个 Query 线程和 UpdateByQuery 导致冲突异常:

一些Update 抛出:

从不成功的 (409) 低级别调用构建的无效 NEST 响应 在 POST 上:/MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c 此 API 调用的审计跟踪:

[1] 错误响应:节点:http://localhost:9200/ 接受:00:00:00.2495465 OriginalException:Elasticsearch.Net.ElasticsearchClientException:请求执行失败。呼叫:状态码 409 来自:POST /MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c。 ServerError:类型:version_conflict_engine_exception 原因: "[d75a34ae-2533-4e15-a852-13e98c5b599c]:版本冲突,必填 seqNo [701069],主要术语 [1]。当前文档有 seqNo [702042] 和主要术语 [1]" 请求:"doc":"id":"d75a34ae-2533-4e15-a852-13e98c5b599c","manufacturer":"777e1602-8390-40c8-817e-fdef4e3fb9c0","price":"1","title ":"31184e90-f1d1-45be-8746-496a50de2f97","描述":"780cc1ab-0a8b-4114-a840-67a528de8e55" 响应:"error":"root_cause":["type":"version_conflict_engine_exception","re​​ason":"[d75a34ae-2533-4e15-a852-13e98c5b599c]: 版本冲突,需要 seqNo [701069],主要术语 [1]。当前的 文档有 seqNo [702042] 和主要术语 [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"],"type":"version_conflict_engine_exception","re​​ason":"[d75a34ae-2533-4e15 -a852-13e98c5b599c]: 版本冲突,需要 seqNo [701069],主要术语 [1]。当前的 文档有 seqNo [702042] 和主要术语 [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex","status":409

还有一些UpdateByQuery 抛出(我已经清理了失败数组):

从不成功的 (409) 低级别调用构建的无效 NEST 响应 在 POST 上: /MyIndex/_update_by_query?conflicts=abort&error_trace=true 此 API 调用的审计跟踪:

[1] 错误响应:节点:http://localhost:9200/ 接受:00:00:00.2636546 OriginalException:Elasticsearch.Net.ElasticsearchClientException:请求执行失败。呼叫:状态码 409 来自:POST /MyIndex/_update_by_query?conflicts=abort&error_trace=true 请求:"query":"match_all":,"script":"source":"ctx._source['price'] = '0'" 响应:"took":120,"timed_out":false,"total":100000,"updated":0,"deleted":0,"batches":1,"version_conflicts":1000,"noops":0 ,"重试次数":"bulk":0,"search":0,"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":["index":"MyIndex ","type":"_doc","id":"d5fb4183-4ff4-43c9-962c-ee9d0ee59a6b","cause":"type":"version_conflict_engine_exception","re​​ason":"[d5fb4183-4ff4-43c9 -962c-ee9d0ee59a6b]: 版本冲突,需要seqNo [0],主要术语[1]。当前的 文档有 seqNo [100000] 和主要术语 [1]","index_uuid":"pPDFKhj6T4y-MpzYECKpxQ","shard":"0"]

因为性能很重要,我不希望放弃 UpdateByQuery(并且仅使用 Update)或对方法访问进行互斥,任何处理这种情况的建议都将不胜感激。

更新: 数据完整性很重要:.Conflicts(Conflicts.Abort)

弹性搜索:7.10.0。

【问题讨论】:

使用 seq_noprimary_term 进行乐观并发控制:elastic.co/guide/en/elasticsearch/reference/7.16/… 【参考方案1】:

TL;DR: 如果您希望它在遇到冲突时继续工作,您可以将conflicts=proceed 传递给update_by_query API。

更多详情: update_by_query page 解释:

当您通过查询请求提交更新时,Elasticsearch 会收到一个 数据流或索引开始处理时的快照 使用内部版本控制请求和更新匹配的文档。什么时候 版本匹配,文档更新,版本号为 递增。如果文档在快照之间发生更改 被采取并处理更新操作,它会导致 版本冲突,操作失败。您可以选择计算版本 冲突而不是通过设置冲突来停止和返回 继续。

所以基本上,您的 updateupdate_by_query 正在尝试更新相同的文档,彼此冲突。使用 conflicts=proceed 会使该操作说“哦,好吧,我将继续更新其他文档”。

【讨论】:

嘿,Doron,谢谢,但我不能不更新一些文档,这会损害我的数据完整性 所以你需要确保每次调用都会更新不同的文档 顺便说一句,100k 文件并不多。通过查询调用进行一次更新应该没问题。您可以添加 slices=auto 以使其更快。 ES 将为您完成并行化工作的艰巨工作

以上是关于ElasticSearch - 并行调用 UpdateByQuery 和 Update 导致 409 冲突的主要内容,如果未能解决你的问题,请参考以下文章

windows下fluentd传输日志到elasticsearch (fluentd elasticsearch https)

二十七、ElasticSearch聚合分析中的算法讲解

Elasticsearch数据迁移--elasticdump

Elasticsearch: nested对象

Elasticsearch性能优化实战指南

Elasticsearch性能优化实战指南