ElasticSearch - 并行调用 UpdateByQuery 和 Update 导致 409 冲突
Posted
技术标签:
【中文标题】ElasticSearch - 并行调用 UpdateByQuery 和 Update 导致 409 冲突【英文标题】:ElasticSearch - calling UpdateByQuery and Update in parallel causes 409 conflicts 【发布时间】:2022-01-18 13:55:52 【问题描述】:使用大型索引(100,000 个文档),我有一个用例会产生多个尝试并行更新文档的线程,源代码使用两种方法来更新文档:Update
和 UpdateByQuery
,所以一些线程调用Update
,其中一些调用UpdateByQuery
。
为简洁起见,每个线程都尝试更新整个文档的相同属性。
这是一个演示用例的小型 POC:
索引 100,000 个 Product
类型的文档,并生成 100 个任务,因此每个任务并行调用 Update
和 UpdateByQuery
。他们都使用MatchAll
查询。
public async Task ConflictsTestAsync()
// index 100,000 documents.
IEnumerable<Product> products = CreateProducts(100000);
await _client.IndexManyAsync(products);
await _client.Indices.UpdateSettingsAsync("MyIndex", s => s
.IndexSettings(i => i.Setting(UpdatableIndexSettings.MaxResultWindow, 100000)));
var searchResponse = _client.Search<Product>(s => s
.From(0)
.Size(100000)
.Query(q => q.MatchAll())
);
IReadOnlyCollection<IHit<Product>> getProducts = searchResponse.Hits;
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
tasks.Add(UpdateByQuery(getProducts));
tasks.Add(Update(getProducts));
await Task.WhenAll(tasks);
public class Product
public string Price get; set;
public string Id get; set;
UpdateByQuery(将价格更新为 0):
private async Task UpdateByQuery()
Func<UpdateByQueryDescriptor<Product>, IUpdateByQueryRequest> updateByQuerySelector = (UpdateByQueryDescriptor<Product> updateByQueryDescriptor) =>
updateByQueryDescriptor
.Conflicts(Conflicts.Abort)
.ErrorTrace()
.Query(x => x.MatchAll())
.Script(x => x.Source("ctx._source['price'] = '0'"));
IUpdateByQueryRequest result = updateByQueryDescriptor;
return result;
;
await _client.UpdateByQueryAsync(updateByQuerySelector, CancellationToken.None);
更新(更新价格为1):
private async Task Update(IReadOnlyCollection<IHit<Product>> keys)
foreach (IHit<Product> product in keys)
DocumentPath<Product> id = new DocumentPath<Product>(product.Id);
Func<UpdateDescriptor<Product, Product>, IUpdateRequest<Product, Product>> updateSelector = (UpdateDescriptor<Product, Product> updateDescriptor) =>
var page = product.Source;
page.Price = "1";
updateDescriptor.Doc(page);
IUpdateRequest<Product, Product> result = updateDescriptor;
return result;
;
await _client.UpdateAsync<Product>(id, updateSelector);
问题是产生多个 Query 线程和 UpdateByQuery 导致冲突异常:
一些Update
抛出:
从不成功的 (409) 低级别调用构建的无效 NEST 响应 在 POST 上:/MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c 此 API 调用的审计跟踪:
[1] 错误响应:节点:http://localhost:9200/ 接受:00:00:00.2495465 OriginalException:Elasticsearch.Net.ElasticsearchClientException:请求执行失败。呼叫:状态码 409 来自:POST /MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c。 ServerError:类型:version_conflict_engine_exception 原因: "[d75a34ae-2533-4e15-a852-13e98c5b599c]:版本冲突,必填 seqNo [701069],主要术语 [1]。当前文档有 seqNo [702042] 和主要术语 [1]" 请求:"doc":"id":"d75a34ae-2533-4e15-a852-13e98c5b599c","manufacturer":"777e1602-8390-40c8-817e-fdef4e3fb9c0","price":"1","title ":"31184e90-f1d1-45be-8746-496a50de2f97","描述":"780cc1ab-0a8b-4114-a840-67a528de8e55" 响应:"error":"root_cause":["type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15-a852-13e98c5b599c]: 版本冲突,需要 seqNo [701069],主要术语 [1]。当前的 文档有 seqNo [702042] 和主要术语 [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"],"type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15 -a852-13e98c5b599c]: 版本冲突,需要 seqNo [701069],主要术语 [1]。当前的 文档有 seqNo [702042] 和主要术语 [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex","status":409
还有一些UpdateByQuery
抛出(我已经清理了失败数组):
从不成功的 (409) 低级别调用构建的无效 NEST 响应 在 POST 上: /MyIndex/_update_by_query?conflicts=abort&error_trace=true 此 API 调用的审计跟踪:
[1] 错误响应:节点:http://localhost:9200/ 接受:00:00:00.2636546 OriginalException:Elasticsearch.Net.ElasticsearchClientException:请求执行失败。呼叫:状态码 409 来自:POST /MyIndex/_update_by_query?conflicts=abort&error_trace=true 请求:"query":"match_all":,"script":"source":"ctx._source['price'] = '0'" 响应:"took":120,"timed_out":false,"total":100000,"updated":0,"deleted":0,"batches":1,"version_conflicts":1000,"noops":0 ,"重试次数":"bulk":0,"search":0,"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":["index":"MyIndex ","type":"_doc","id":"d5fb4183-4ff4-43c9-962c-ee9d0ee59a6b","cause":"type":"version_conflict_engine_exception","reason":"[d5fb4183-4ff4-43c9 -962c-ee9d0ee59a6b]: 版本冲突,需要seqNo [0],主要术语[1]。当前的 文档有 seqNo [100000] 和主要术语 [1]","index_uuid":"pPDFKhj6T4y-MpzYECKpxQ","shard":"0"]
因为性能很重要,我不希望放弃 UpdateByQuery
(并且仅使用 Update
)或对方法访问进行互斥,任何处理这种情况的建议都将不胜感激。
更新:
数据完整性很重要:.Conflicts(Conflicts.Abort)
弹性搜索:7.10.0。
【问题讨论】:
使用seq_no
和 primary_term
进行乐观并发控制:elastic.co/guide/en/elasticsearch/reference/7.16/…
【参考方案1】:
TL;DR:
如果您希望它在遇到冲突时继续工作,您可以将conflicts=proceed
传递给update_by_query
API。
更多详情: update_by_query page 解释:
当您通过查询请求提交更新时,Elasticsearch 会收到一个 数据流或索引开始处理时的快照 使用内部版本控制请求和更新匹配的文档。什么时候 版本匹配,文档更新,版本号为 递增。如果文档在快照之间发生更改 被采取并处理更新操作,它会导致 版本冲突,操作失败。您可以选择计算版本 冲突而不是通过设置冲突来停止和返回 继续。
所以基本上,您的 update
和 update_by_query
正在尝试更新相同的文档,彼此冲突。使用 conflicts=proceed
会使该操作说“哦,好吧,我将继续更新其他文档”。
【讨论】:
嘿,Doron,谢谢,但我不能不更新一些文档,这会损害我的数据完整性 所以你需要确保每次调用都会更新不同的文档 顺便说一句,100k 文件并不多。通过查询调用进行一次更新应该没问题。您可以添加 slices=auto 以使其更快。 ES 将为您完成并行化工作的艰巨工作以上是关于ElasticSearch - 并行调用 UpdateByQuery 和 Update 导致 409 冲突的主要内容,如果未能解决你的问题,请参考以下文章
windows下fluentd传输日志到elasticsearch (fluentd elasticsearch https)