如何在 Elasticsearch 中获得超过 10K 的日志/结果

Posted

技术标签:

【中文标题】如何在 Elasticsearch 中获得超过 10K 的日志/结果【英文标题】:How to get more than 10K logs/results in Elasticsearch 【发布时间】:2021-07-15 19:23:00 【问题描述】:

如果假设我在最新版本的 Elasticsearch (7.13) 中拥有超过 1 万条日志/结果,我该如何获取所有日志?我正在阅读scroll search results,但一开始它显示:

我们不再推荐使用滚动 API 进行深度分页。如果 您需要在分页时保留索引状态 10,000 次点击,使用带有时间点的 search_after 参数 (坑)。

但search_after 表示您可以访问超过 10,000 次点击,但您需要使用point in time api 来获取 PIT(时间点)ID,然后将该 ID 传递给 search_after 参数。在 kibana CLI 中,如果您输入 cmd POST /YOUR PATTERN INDEX NAME*/_pit?keep_alive=1m,它将返回该 PIT ID。但是您将如何在 NEST 中为 .net 客户端执行该命令?

This 只告诉你如果你已经有了 PIT ID 怎么办,但没有告诉你如何执行 post 命令来获取 PIT ID?有没有办法这样做,而不必去 Kibana -> Discover -> CLI 并运行命令POST /customer-simulation-es-app-logs*/_pit?keep_alive=1m(customer-sim 是我的索引的名称)


在实施 Rob 的示例之前,我有以下内容:

[HttpGet("GetMonthlyLogs")]
        public async Task<List<EsSource>> GetLogsByDate()
        
    
            string indexName = "customer-simulation-es-app-logs*";
            var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
            connectionSettings.DefaultIndex(indexName);
            connectionSettings.EnableDebugMode();
            _elasticClient = new ElasticClient(connectionSettings);

            // this will return the number of results in the index based on the criteria below:
            var responseHits = _elasticClient.Count<EsSource>(c => c
                 .Query(q => q
                     .Bool(b => b
                         .Should(
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Error")),
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Information")))
                         .Filter(f => f.DateRange(dr => dr
                         .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                         .MinimumShouldMatch(1)))).Count;

            var response = await _elasticClient.SearchAsync<EsSource>(s => s
                  .Size(3000) // must see about this
                  .Source(src => src.Includes(i => i
                                    .Fields(f => f.timestamp,
                                            f => f.level,
                                            f => f.messageTemplate,
                                            f => f.message)))
                  .Index("customer-simulation-es-app-logs*")
                  .Query(q => q
                      .Bool(b => b
                          .Should(
                                m => m
                                .Match(ma => ma
                                    .Field(fa => fa.level)
                                    .Query("Error")),
                                m => m
                                .Match(ma => ma
                                    .Field(fa => fa.level)
                                    .Query("Information")))
                          .Filter(f => f.DateRange(dr => dr
                          .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                          .MinimumShouldMatch(1))));


            return response?.Documents.ToList();
        

public class EsSource

       [Date(Name = "@timestamp")]
        public DateTimeOffset timestamp  get; set; 
        public String level  get; set; 
        public String messageTemplate  get; set; 
        public String message  get; set; 



我试图给 Rob 的示例实现一个尝试,我所做的如下。但是,我的问题是,如果我没有 EsDocuments 中的“ID”,我可以改用timestamp 吗? foreach 是必需的,因为这会将结果分组为 1000 对吗?我还可以按时间戳排序还是必须是结果 ID?因为我没有ID,所以我想创建另一个使用searchapi的var searchResponse,然后创建一个名为EsID的通用变量,这样我就可以在点击中循环搜索响应,比如foreach(var item in searchResponse.Hits() EsID = item.Id ,然后将它用于foreach 具有批处理 (batches.Select(x =&gt; EsID)) 并将其用于排序。但我觉得那将是重复的代码......如果我错了,请纠正我?

请在此处查看我的实现:

private IElasticClient _elasticClient;

[HttpGet("GetMonthlyLogs")]
       public async Task<List<EsSource>> GetLogsByDate()
       
           
            string indexName = "customer-simulation-es-app-logs*";
            var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
            connectionSettings.DefaultIndex(indexName);
            connectionSettings.EnableDebugMode();
            _elasticClient = new ElasticClient(connectionSettings);

             // this will return the number of results in the index based on the criteria's 
            var responseHits = _elasticClient.Count<EsSource>(c => c
                 .Query(q => q
                     .Bool(b => b
                         .Should(
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Error")),
                               m => m
                               .Match(ma => ma
                                   .Field(fa => fa.level)
                                   .Query("Information")))
                         .Filter(f => f.DateRange(dr => dr
                         .Field("@timestamp")
                             .GreaterThanOrEquals("2021-07-16T12:46:00.227-05:00")
                             .LessThanOrEquals("2021-07-16T12:55:00.227-05:00")))
                         .MinimumShouldMatch(1)))).Count;

           

            foreach (var batches in Enumerable.Range(0, (int)responseHits).Batch(1000))
            
                var bulk = await _elasticClient.IndexManyAsync(batches.Select(x => new EsSource  /* can I use timestamp?? */));
            

            await _elasticClient.Indices.RefreshAsync();

            var openPit = await _elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
            var pit = openPit.Id;

            var searchAfter = 0;

            try
            
                while (true)
                
                    var response = await _elasticClient.SearchAsync<EsSource>(s => s
                          .TrackTotalHits(false) // disable the tracking of total hits to speed up pagination
                          .Size(1000)
                          // pass pit id & extend lifetime of it by another minute
                          .PointInTime(pit, d => d.KeepAlive("1m"))
                          .Source(src => src.Includes(i => i
                                              .Fields(f => f.timestamp,
                                                      f => f.level,
                                                      f => f.messageTemplate,
                                                      f => f.message)))
                          .Query(q => q
                              .Bool(b => b
                                  .Should(
                                          m => m
                                          .Match(ma => ma
                                              .Field(fa => fa.level)
                                              .Query("Error")),
                                          m => m
                                          .Match(ma => ma
                                              .Field(fa => fa.level)
                                              .Query("Information")))
                                  .Filter(f => f.DateRange(dr => dr
                                  .Field("@timestamp")
                                      .GreaterThanOrEquals("2021-07-14T00:00:00.000-05:00")
                                      .LessThanOrEquals("2021-07-14T23:59:59.999-05:00")))
                                  .MinimumShouldMatch(1)))
                          // can I assort with timestamp or does it have to be the result ID?
                          .Sort(srt => srt.Ascending(f => f.timestamp))
                          .SearchAfter(searchAfter));

                    if (response.Documents.Count == 0)
                    
                        break;
                    


                    //searchAfter = response.Documents.LastOrDefault()?.timestamp ?? x;
                
            
            finally
            
                // closing the pit
                var closePit = await _elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
            

            return // response?.Documents.ToList();
        


    public class EsSource
    
        [Date(Name = "@timestamp")]
        public DateTimeOffset timestamp  get; set; 
        public String level  get; set; 
        public String messageTemplate  get; set; 
        public String message  get; set;         
    

【问题讨论】:

【参考方案1】:

我已经准备了一个带有 cmets 的示例应用程序,它演示了如何使用 PIT 从索引中检索所有文档并进行搜索。

class Program

    static async Task Main(string[] args)
    
        string indexName = "test";
        var connectionSettings = new ConnectionSettings(new Uri("http://localhost:9200"));
        connectionSettings.DefaultIndex(indexName);
        connectionSettings.EnableDebugMode();
        var elasticClient = new ElasticClient(connectionSettings);

        await elasticClient.Indices.DeleteAsync(indexName);
        var indexResponse = await elasticClient.Indices.CreateAsync(indexName);

        // index some test data
        // Batch coming from morelinq nuget
        Console.WriteLine($"Index some data into index");
        foreach (var batches in Enumerable.Range(0, 20000).Batch(1000))
        
            var bulk = await elasticClient.IndexManyAsync(batches.Select(x => new EsDocument Id = x ));
        

        await elasticClient.Indices.RefreshAsync();

        var countResponse = await elasticClient.CountAsync<EsDocument>(d => d);
        Console.WriteLine($"Documents in index: countResponse.Count");

        Console.WriteLine($"Open new pit");
        var openPit = await elasticClient.OpenPointInTimeAsync(indexName, d => d.KeepAlive("1m"));
        var pit = openPit.Id;

        Console.WriteLine($"Read all docs from index ..");
        // we will start reading docs from the beginning
        var searchAfter = 0;
        try
        
            while (true)
            
                var searchResponse = await elasticClient.SearchAsync<EsDocument>(s => s
                    // disable the tracking of total hits to speed up pagination.
                    .TrackTotalHits(false)
                    .Size(1000)
                    // pass pit id and extend lifetime of it by another minute
                    .PointInTime(pit, d => d.KeepAlive("1m"))
                    .Query(q => q.MatchAll())
                    // sort by Id filed so we can pass last retrieved id to next search
                    .Sort(sort => sort.Ascending(f => f.Id))
                    // pass last id we received from prev. search request so we can keep retrieving more documents
                    .SearchAfter(searchAfter));

                // if we didn't receive any docs just stop processing
                if (searchResponse.Documents.Count == 0)
                
                    break;
                

                Console.WriteLine(
                    $"Id [searchResponse.Documents.FirstOrDefault()?.Id..searchResponse.Documents.LastOrDefault()?.Id]");
                searchAfter = searchResponse.Documents.LastOrDefault()?.Id ?? 0;
            
        
        finally
        
            Console.WriteLine($"Close pit");
            var closePit = await elasticClient.ClosePointInTimeAsync(d => d.Id(pit));
        
    

    class EsDocument
    
        public int Id  get; set; 
    

打印

Index some data into index
Documents in index: 20000
Open new pit
Read all docs from index ..
Id [1..1000]
Id [1001..2000]
Id [2001..3000]
Id [3001..4000]
Id [4001..5000]
Id [5001..6000]
Id [6001..7000]
Id [7001..8000]
Id [8001..9000]
Id [9001..10000]
Id [10001..11000]
Id [11001..12000]
Id [12001..13000]
Id [13001..14000]
Id [14001..15000]
Id [15001..16000]
Id [16001..17000]
Id [17001..18000]
Id [18001..19000]
Id [19001..19999]
Close pit

【讨论】:

感谢您提供样品! await elasticClient.Indices.DeleteAsync(indexName); var indexResponse = await elasticClient.Indices.CreateAsync(indexName); 是否会删除现有日志并创建新日志?我可以跳过这部分吗?我不想删除任何现有的日志 是的,这部分绝对可以跳过,只是为了设置测试数据。 太棒了,我会跳过谢谢! 如果您不介意可以查看我更新的帖子吗?我有一些事情需要澄清和指导!谢谢! :) 您可以将其替换为 DateTime.Min 在您的情况下。这并不重要,因为程序不会到达那里,因为我们在上面检查了searchResponse.Documents.Count == 0【参考方案2】:

您需要将PointInTime 实例添加到您的搜索查询中,如下所示:

esQuery.PointInTime = new PointInTime(PointInTimeId,KeepAlive);

您对 ES 的第一次请求您的 PointInTimeId 将为空,有关更多信息,请查看 ES 官方文档 here。

【讨论】:

不过,您将如何发送对PointInTimeId 的第一个请求?这是我的问题... 你做你想要的查询并设置 PoinInTime 你得到的结果会给你 PoinInTimeId。

以上是关于如何在 Elasticsearch 中获得超过 10K 的日志/结果的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch 字段限制超过 1000

Elasticsearch,如何在所有情况下获得搜索结果

如何获得具有多个字段的 Elasticsearch 聚合

Elasticsearch 磁盘使用率超过警戒水位线,怎么办?

如何在我的jtable中获得超过100行?

如何合并来自多个 Elasticsearch 查询的排名?