如何在 Elasticsearch 中获得超过 10K 的日志/结果
Posted
技术标签:
【中文标题】如何在 Elasticsearch 中获得超过 10K 的日志/结果【英文标题】:How to get more than 10K logs/results in Elasticsearch 【发布时间】:2021-07-15 19:23:00 【问题描述】:如果假设我在最新版本的 Elasticsearch (7.13) 中拥有超过 1 万条日志/结果,我该如何获取所有日志?我正在阅读scroll search results,但一开始它显示:
我们不再推荐使用滚动 API 进行深度分页。如果 您需要在分页时保留索引状态 10,000 次点击,使用带有时间点的 search_after 参数 (坑)。
但search_after 表示您可以访问超过 10,000 次点击,但您需要使用point in time api 来获取 PIT(时间点)ID,然后将该 ID 传递给 search_after 参数。在 kibana CLI 中,如果您输入 cmd POST /YOUR PATTERN INDEX NAME*/_pit?keep_alive=1m
,它将返回该 PIT ID。但是您将如何在 NEST 中为 .net 客户端执行该命令?
This 只告诉你如果你已经有了 PIT ID 怎么办,但没有告诉你如何执行 post 命令来获取 PIT ID?有没有办法这样做,而不必去 Kibana -> Discover -> CLI 并运行命令POST /customer-simulation-es-app-logs*/_pit?keep_alive=1m
(customer-sim 是我的索引的名称)
在实施 Rob 的示例之前,我有以下内容:
[HttpGet("GetMonthlyLogs")]
public async Task<List<EsSource>> GetLogsByDate()
string indexName = "customer-simulation-es-app-logs*";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
_elasticClient = new ElasticClient(connectionSettings);
// this will return the number of results in the index based on the criteria below:
var responseHits = _elasticClient.Count<EsSource>(c => c
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1)))).Count;
var response = await _elasticClient.SearchAsync<EsSource>(s => s
.Size(3000) // must see about this
.Source(src => src.Includes(i => i
.Fields(f => f.timestamp,
f => f.level,
f => f.messageTemplate,
f => f.message)))
.Index("customer-simulation-es-app-logs*")
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1))));
return response?.Documents.ToList();
public class EsSource
[Date(Name = "@timestamp")]
public DateTimeOffset timestamp get; set;
public String level get; set;
public String messageTemplate get; set;
public String message get; set;
我试图给 Rob 的示例实现一个尝试,我所做的如下。但是,我的问题是,如果我没有 EsDocuments 中的“ID”,我可以改用timestamp
吗? foreach
是必需的,因为这会将结果分组为 1000 对吗?我还可以按时间戳排序还是必须是结果 ID?因为我没有ID
,所以我想创建另一个使用searchapi的var searchResponse
,然后创建一个名为EsID的通用变量,这样我就可以在点击中循环搜索响应,比如foreach(var item in searchResponse.Hits() EsID = item.Id
,然后将它用于foreach
具有批处理 (batches.Select(x => EsID)
) 并将其用于排序。但我觉得那将是重复的代码......如果我错了,请纠正我?
请在此处查看我的实现:
private IElasticClient _elasticClient;
[HttpGet("GetMonthlyLogs")]
public async Task<List<EsSource>> GetLogsByDate()
string indexName = "customer-simulation-es-app-logs*";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
_elasticClient = new ElasticClient(connectionSettings);
// this will return the number of results in the index based on the criteria's
var responseHits = _elasticClient.Count<EsSource>(c => c
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
.LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
.MinimumShouldMatch(1)))).Count;
foreach (var batches in Enumerable.Range(0, (int)responseHits).Batch(1000))
var bulk = await _elasticClient.IndexManyAsync(batches.Select(x => new EsSource /* can I use timestamp?? */));
await _elasticClient.Indices.RefreshAsync();
var openPit = await _elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
var pit = openPit.Id;
var searchAfter = 0;
try
while (true)
var response = await _elasticClient.SearchAsync<EsSource>(s => s
.TrackTotalHits(false) // disable the tracking of total hits to speed up pagination
.Size(1000)
// pass pit id & extend lifetime of it by another minute
.PointInTime(pit, d => d.KeepAlive("1m"))
.Source(src => src.Includes(i => i
.Fields(f => f.timestamp,
f => f.level,
f => f.messageTemplate,
f => f.message)))
.Query(q => q
.Bool(b => b
.Should(
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Error")),
m => m
.Match(ma => ma
.Field(fa => fa.level)
.Query("Information")))
.Filter(f => f.DateRange(dr => dr
.Field("@timestamp")
.GreaterThanOrEquals("2021-07-14T00:00:00.000-05:00")
.LessThanOrEquals("2021-07-14T23:59:59.999-05:00")))
.MinimumShouldMatch(1)))
// can I assort with timestamp or does it have to be the result ID?
.Sort(srt => srt.Ascending(f => f.timestamp))
.SearchAfter(searchAfter));
if (response.Documents.Count == 0)
break;
//searchAfter = response.Documents.LastOrDefault()?.timestamp ?? x;
finally
// closing the pit
var closePit = await _elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
return // response?.Documents.ToList();
public class EsSource
[Date(Name = "@timestamp")]
public DateTimeOffset timestamp get; set;
public String level get; set;
public String messageTemplate get; set;
public String message get; set;
【问题讨论】:
【参考方案1】:我已经准备了一个带有 cmets 的示例应用程序,它演示了如何使用 PIT 从索引中检索所有文档并进行搜索。
class Program
static async Task Main(string[] args)
string indexName = "test";
var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
connectionSettings.DefaultIndex(indexName);
connectionSettings.EnableDebugMode();
var elasticClient = new ElasticClient(connectionSettings);
await elasticClient.Indices.DeleteAsync(indexName);
var indexResponse = await elasticClient.Indices.CreateAsync(indexName);
// index some test data
// Batch coming from morelinq nuget
Console.WriteLine($"Index some data into index");
foreach (var batches in Enumerable.Range(0, 20000).Batch(1000))
var bulk = await elasticClient.IndexManyAsync(batches.Select(x => new EsDocument Id = x ));
await elasticClient.Indices.RefreshAsync();
var countResponse = await elasticClient.CountAsync<EsDocument>(d => d);
Console.WriteLine($"Documents in index: countResponse.Count");
Console.WriteLine($"Open new pit");
var openPit = await elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
var pit = openPit.Id;
Console.WriteLine($"Read all docs from index ..");
// we will start reading docs from the beginning
var searchAfter = 0;
try
while (true)
var searchResponse = await elasticClient.SearchAsync<EsDocument>(s => s
// disable the tracking of total hits to speed up pagination.
.TrackTotalHits(false)
.Size(1000)
// pass pit id and extend lifetime of it by another minute
.PointInTime(pit, d => d.KeepAlive("1m"))
.Query(q => q.MatchAll())
// sort by Id filed so we can pass last retrieved id to next search
.Sort(sort => sort.Ascending(f => f.Id))
// pass last id we received from prev. search request so we can keep retrieving more documents
.SearchAfter(searchAfter));
// if we didn't receive any docs just stop processing
if (searchResponse.Documents.Count == 0)
break;
Console.WriteLine(
$"Id [searchResponse.Documents.FirstOrDefault()?.Id..searchResponse.Documents.LastOrDefault()?.Id]");
searchAfter = searchResponse.Documents.LastOrDefault()?.Id ?? 0;
finally
Console.WriteLine($"Close pit");
var closePit = await elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
class EsDocument
public int Id get; set;
打印
Index some data into index
Documents in index: 20000
Open new pit
Read all docs from index ..
Id [1..1000]
Id [1001..2000]
Id [2001..3000]
Id [3001..4000]
Id [4001..5000]
Id [5001..6000]
Id [6001..7000]
Id [7001..8000]
Id [8001..9000]
Id [9001..10000]
Id [10001..11000]
Id [11001..12000]
Id [12001..13000]
Id [13001..14000]
Id [14001..15000]
Id [15001..16000]
Id [16001..17000]
Id [17001..18000]
Id [18001..19000]
Id [19001..19999]
Close pit
【讨论】:
感谢您提供样品!await elasticClient.Indices.DeleteAsync(indexName); var indexResponse = await elasticClient.Indices.CreateAsync(indexName);
是否会删除现有日志并创建新日志?我可以跳过这部分吗?我不想删除任何现有的日志
是的,这部分绝对可以跳过,只是为了设置测试数据。
太棒了,我会跳过谢谢!
如果您不介意可以查看我更新的帖子吗?我有一些事情需要澄清和指导!谢谢! :)
您可以将其替换为 DateTime.Min
在您的情况下。这并不重要,因为程序不会到达那里,因为我们在上面检查了searchResponse.Documents.Count == 0
。【参考方案2】:
您需要将PointInTime
实例添加到您的搜索查询中,如下所示:
esQuery.PointInTime = new PointInTime(PointInTimeId,KeepAlive);
您对 ES 的第一次请求您的 PointInTimeId
将为空,有关更多信息,请查看 ES 官方文档 here。
【讨论】:
不过,您将如何发送对PointInTimeId
的第一个请求?这是我的问题...
你做你想要的查询并设置 PoinInTime 你得到的结果会给你 PoinInTimeId。以上是关于如何在 Elasticsearch 中获得超过 10K 的日志/结果的主要内容,如果未能解决你的问题,请参考以下文章