用于无限结果的 RavenDB 流 - 连接弹性

Posted

技术标签:

【中文标题】用于无限结果的 RavenDB 流 - 连接弹性【英文标题】:RavenDB Stream for Unbounded Results - Connection Resilience 【发布时间】:2014-12-31 13:40:24 【问题描述】:

我们使用 RavenDB 中的 Stream 功能在 2 个数据库之间加载、转换和迁移数据,如下所示:

var query = originSession.Query<T>(IndexForQuery);

using (var stream = originSession.Advanced.Stream(query))

    while (stream.MoveNext())
    
        var streamedDocument = stream.Current.Document;

        OpenSessionAndMigrateSingleDocument(streamedDocument);
    

问题是其中一个集合有数百万行,我们不断收到以下格式的IOException

Application: MigrateToNewSchema.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.IO.IOException
Stack:
   at System.Net.ConnectStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32)
   at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef)
   at System.IO.StreamReader.Read(Char[], Int32, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read()
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext()
   at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext()
   at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection()
   at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore)
   at MigrateToNewSchema.Program.Main(System.String[])

这种情况在流式传输中发生了很长一段时间,当然在这种情况下会出现短暂的连接问题(需要几个小时才能完成)。

但是,当我们重试时,因为我们使用的是Query,所以我们必须从头开始。所以最终如果在整个Stream 过程中出现连接失败,那么我们必须再试一次,直到它端到端工作。

我知道您可以将ETag 与流一起使用以在某个时间点有效地重新启动,但是使用Query 执行此操作不会过载,我们需要过滤正在迁移的结果并指定正确的集合。

那么,在 RavenDB 中,有没有办法提高连接的内部弹性(连接字符串属性、内部设置等)或有效地“恢复”错误流?

【问题讨论】:

我发现了Data Subscriptions,这是 RavenDb 3.0 的一项功能,它提供了一种可靠的机制来遍历符合指定条件的文档集合,并让您可以轻松地从中断的地方继续。如果有人愿意整理一些代码示例来展示该功能如何回答这个问题,我会认为这是值得的。 您是否习惯于使用查询?虽然效率会更低,但这是一个迁移,所以内存不是问题——为什么不迭代原始文档集合并在内存中过滤,这样您就可以在 Etag 处恢复?这就是我处理所有流媒体的方式,我从不使用查询。 @StriplingWarrior 已经有一段时间了 :-) 我不再为使用 RavenDB 的公司工作,但这仍然让我感兴趣,所以我今天会用数据订阅代码回答跨度> 【参考方案1】:

根据@StriplingWarrior 的建议,我使用Data Subscriptions 重新创建了解决方案。

使用这种方法,我能够迭代所有 200 万行(尽管每个项目的处理量要少得多);当我们尝试使用 Streams 实现相同的逻辑时,这里有 2 点会有所帮助:

    只有在确认后才会从订阅“队列”中删除批次(与大多数标准队列一样)
      订阅的IObserver&lt;T&gt; 必须成功完成才能设置此确认。 此信息由服务器而不是客户端处理,因此允许客户端重新启动,而不会影响订阅中处理的最后一个成功位置 See here for more details
    正如@StriplingWarrior 所指出的,因为您可以使用过滤器创建订阅,直到属性级别,如果订阅本身发生异常,则可以使用较小的结果集进行重播。
      第一点确实取代了这一点;但它为我们提供了 Stream API 中没有的额外灵活性

测试环境是 RavenDB 3.0 数据库(本地机器,作为 Windows 服务运行),默认设置针对 200 万条记录的集合。

生成虚拟记录的代码:

using (IDocumentStore store = GetDocumentStore())

    store.Initialize();

    using (var bulkInsert = store.BulkInsert())
    
        for (var i = 0; i != recordsToCreate; i++)
        
            var person = new Person
            
                Id = Guid.NewGuid(),
                Firstname = NameGenerator.GenerateFirstName(),
                Lastname = NameGenerator.GenerateLastName()
            ;

            bulkInsert.Store(person);
        
    

订阅此集合是以下情况:

using (IDocumentStore store = GetDocumentStore())

    store.Initialize();

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());

    var personSubscription = store.Subscriptions.Open<Person>(
        subscriptionId, new SubscriptionConnectionOptions()
    
        BatchOptions = new SubscriptionBatchOptions()
        
            // Max number of docs that can be sent in a single batch
            MaxDocCount = 16 * 1024,  
            // Max total batch size in bytes
            MaxSize = 4 * 1024 * 1024,
            // Max time the subscription needs to confirm that the batch
            // has been successfully processed
            AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
        ,
        IgnoreSubscribersErrors = false,
        ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
    );

    personSubscription.Subscribe(new PersonObserver());

    while (true)
    
        Thread.Sleep(TimeSpan.FromMilliseconds(500));
    

注意PersonObserver;这只是 IObserver 的一个基本实现,如下所示:

public class PersonObserver : IObserver<Person>

    public void OnCompleted()
    
        Console.WriteLine("Completed");
    

    public void OnError(Exception error)
    
        Console.WriteLine("Error occurred: " + error.ToString());
    

    public void OnNext(Person person)
    
        Console.WriteLine($"Received 'person.Firstname person.Lastname'");
    

【讨论】:

不错的文章。我发现传入Task(或基于给定CancellationToken 创建Task)和await 任务而不是while(true) 很有用。这样,调用代码可以安全地取消操作,而不会杀死整个线程或进程。我还提出了一种基于 ETag 的机制,以帮助迁移知道何时“完成”并命中所有目标文档,因此它可以自行停止,但它非常棘手,并不适用于所有目的。

以上是关于用于无限结果的 RavenDB 流 - 连接弹性的主要内容,如果未能解决你的问题,请参考以下文章

#yyds干货盘点# RavenDB 文档建模--琐碎的注意事项--处理无限增长的文档

从嵌入式 RavenDB 中的索引检索结果时出现问题

EventStore 和 RavenDB 持久性 JsonReaderException

RavenDB 不使用时不允许删除

我可以在 RavenDB 中使用构造函数重载吗?

Azure 流分析作业无限运行