创建一个 IObservable<T> 正确返回大量数据(异步?)

Posted

技术标签:

【中文标题】创建一个 IObservable<T> 正确返回大量数据(异步?)【英文标题】:Creating an IObservable<T> that returns properly a ton of data (asynchronously?) 【发布时间】:2021-12-14 22:13:39 【问题描述】:

我对@9​​87654321@ 不是很熟悉,但我正在使用的一个包以某种方式强加给我。

我预计会返回一个IObservable&lt;T&gt;,稍后会在其上调用订阅,然后立即处理其结果。我的意图是用来自海量数据集的数据填充它。它从文本文件中逐行读取 GB 的数据。

但我似乎无法找到一个很好的例子来说明如何将我的一小时的 while 循环变成一个可观察的,并且它不希望预先读取所有数据。 我看到Observable.FromAsync 有一些变体,但任务也不是我的强项,似乎也无法让它发挥作用。

到目前为止,我取得的最好成绩如下。哪个编译并运行,但绝对什么都不做。据我所知,从不调用Create 中的代码。

public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)

    return Observable.Create<Data>(async (subject, token) =>
    
        try
        
            FileStream stream = null;
            StreamReader sr = null;
            DateTime date = startDate;
            string path = string.Format("C:\\MYPATH\\0-1-2.csv", date.Year,
                date.Month.ToString("00"), date.Day.ToString("00"));
            while (date < endDate)
            
                if (!File.Exists(path))
                
                    date.AddDays(1);
                    continue;
                
                stream = File.Open(path, FileMode.Create, FileAccess.Read);
                sr = new StreamReader(stream);
                while (!sr.EndOfStream)
                
                    string line = await sr.ReadLineAsync();
                    Data d = ParseData(line);
                    subject.OnNext(d);
                
                if (stream != null)
                
                    sr.Close();
                    stream.Close();
                
            
        
        catch (Exception ex)
        
            try
            
                subject.OnError(ex);
            
            catch (Exception)
            
                Console.WriteLine("An exception was thrown while trying to call" +
                    " OnError on the observable subject -- means you're not" +
                    " catching exceptions");
                throw;
            
        
    ).Publish();

我什至不确定我想做的事情在技术上是否可行,因为我不确定 Observable 模式是如何工作的。但是由于上下文的原因,它似乎期望服务器连接通常会为其提供DataStream。所以我认为这是可以做到的。但只有正确组合 Observable 创建方法。

如果有人有一些好的文档可供阅读,以对初学者友好的方式解释这一点,那也很好。

根据要求,下面如何调用该方法,但它主要进入黑盒库。

IObservable<Data> data = GetHistoricalData(
    new DateTime(2021, 1, 1, 0, 0, 0, DateTimeKind.Utc),
    new DateTime(2021, 1, 5, 0, 0, 0, DateTimeKind.Utc));

// Build charts from data
IObservable<(Data, Chart)> dataWithChart = data.GenerateCharts(TimeFrame);

// Generate signals from the charts
IObservable<(Signal, Chart)> signalsWithChart = dataWithChart.GenerateSignals(
    Symbol, strategy);

// We're interested in the signals only
IObservable<Signal> signals = signalsWithChart.SelectSignals();

// Show information from each signal
IDisposable subscription = signals.Subscribe(ShowSignal);
subscription.Dispose();

【问题讨论】:

根据我的经验,使用Observable.Create 几乎总是错误的方式。它会创建可能导致竞争条件的可观察对象,并且您会陷入程序性而非功能性术语的思考中。 您能否也包括如何调用GetHistoricalData 方法?是这样使用的吗? var subscription = GetHistoricalData(date1, date2); subscription.Dispose(); @TheodorZoulias 按要求添加了它,但我最初并没有发布它,因为我认为它没有增加太多价值,因为它通过了 5 次黑匣子。 嗯,所以您说GenerateChartsGenerateSignalsSelectSignalsShowSignal 方法都不受您控制?您不能修改这些方法中的任何一个吗?你至少有这些方法的源代码吗? 如果我进行逆向工程,是的。否则,我真的无能为力。 【参考方案1】:

我认为你应该阅读"Introduction to Rx",尤其是关于冷热可观察的部分,以及publish and connect。

您的代码存在几个较小的问题。 我为您的代码制作了一个更简单的版本,在我删除了对.Publish() 的调用后它就可以工作了。而且我相当肯定你不想在这里。

Publish 制作了一个支持多个观察者的包装器。您可以使用Publish() 使其适用于多个观察者,然后在所有观察者都订阅后调用Connect。但发布更适合“热”流,例如鼠标/键盘事件。您的数据只会被读取一次。在您调用 Connect 之后连接的任何 observable 都不会获得已经读取的数据。如果你需要这样的多个订阅者,我会返回IConnectableObservable 而不是IObservable,这样一旦所有观察者都订阅了,你就可以Connect()

至于你的代码:

保持简单。每当您使用流时,请使用 using,除非您非常清楚为什么不应该这样做。 计算循环内的路径。使用字符串插值代替string.Format()。 最后拨打subject.OnCompleted()。 重新分配date 变量。 DateTime c# 中的值是不可变的。 date.AddDays() 不会修改date,而是返回一个新的DateTime。 使用您真正需要的文件模式。如果您不打算在文件不存在的情况下调用代码,则不需要 .Create。

这对我有用:

public IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)

    return Observable.Create<Data>(async (subject, token) =>
    
        var date = startDate;
        try
        
            while (date < endDate)
            
                string path = $@"C:\MYPATH\date.Year-date.Month:00-date.Day:00.csv";

                if (File.Exists(path))
                
                    using (var stream = File.Open(path, FileMode.Open, FileAccess.Read))
                    using (var sr = new StreamReader(stream))
                    
                        while (!sr.EndOfStream)
                        
                            var line = await sr.ReadLineAsync();
                            if (!string.IsNullOrWhiteSpace(line))
                            
                                var data = ParseData(line);
                                subject.OnNext(data);
                            
                        
                    
                

                date = date.AddDays(1);
            
         catch (Exception e)
        
            subject.OnError(e);
        
        
        subject.OnCompleted();
    );

【讨论】:

你需要在OnError之后退出while。出错后无​​法继续循环。 好吧,至少,它现在正在工作,并且正在处理该方法。它在从第一个文件中读取了 100 行之后就会出错,但这可能不完全是这段代码的错。感谢您指出菜鸟的错误,我现在完全没有专注于这一点(我也忘记在 EOF 被击中后添加一天)我会在您提供的链接中阅读更多内容,然后从那里开始。再次感谢! 当然,我需要确保我的 Main 方法一直运行,直到所有文件处理完成。现在它工作正常。 感谢@Enigmativity,这是在将我的测试代码移动到 SO 时搞砸的。【参考方案2】:

您的代码似乎被一个滥用IObservable&lt;T&gt; monad 的包所使用,将其视为IEnumerable&lt;T&gt;。当某人Subscribes 到一个可观察序列然后立即取消订阅时,他们只会观察订阅期间同步推送的通知。这相当于同步枚举IEnumerable&lt;T&gt;。所以我的建议是通过编写基于IEnumerable&lt;T&gt; 的代码来简化你的生活,然后在使用ToObservable Rx 运算符将其转换为可观察后将序列传递给包:

public static IEnumerable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)

    //...
    foreach (var line in File.ReadLines(path))
    
        yield return ParseData(line);
    
    //...

从可枚举到可观察的转换:

IObservable<Data> data = GetHistoricalData(date1, date2).ToObservable();

【讨论】:

这是否意味着我必须将整个数据集保存在内存中才能使其可观察?对于多 GB 大小的数据集,这将是一种令人望而却步的方法。虽然我确实同意这种可观察到的东西感觉有点强迫。 @Smileynator 我不这么认为。您一次从文件中读取一行,然后解析它,然后 yield 解析后的 Data。然后将该数据推送到包提供的观察者。包对它们做任何它想做的事,然后控制权返回给你,从文件中读取下一行。当包调用Subscribe 方法时,一切都会发生。当Subscribe 返回时,一切都会完成。处理 IDisposable 将是无操作的。 @Smileynator 如果你想解析下一段数据,而包对前一段进行处理,换句话说,如果你想引入并行性,你可以考虑使用OffloadEnumeration我在this 答案底部附近发布的方法。你可以这样使用它:IObservable&lt;Data&gt; data = GetHistoricalData(date1, date2).OffloadEnumeration().ToObservable();【参考方案3】:

给你。一个很好的查询:

public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) =>
    from n in Observable.Range(0, int.MaxValue)
    let d = startDate.AddDays(n)
    where d < endDate
    let path = $"C:\\MYPATH\\d.ToString("yyyy-MM-dd").csv"
    where File.Exists(path)
    from l in Observable.Using(
        () => File.Open(path, FileMode.Open, FileAccess.Read),
        s => Observable.Using(
            () => new StreamReader(s),
            sr => Observable.While(
                () => !sr.EndOfStream,
                Observable.Defer(
                    () => Observable.FromAsync(
                    () => sr.ReadLineAsync())))))
    select ParseData(l);

【讨论】:

以上是关于创建一个 IObservable<T> 正确返回大量数据(异步?)的主要内容,如果未能解决你的问题,请参考以下文章

是否可以将 IObservable<T> 转换为 IAsyncEnumerable<T>

IObservable<T> 和 INotifyPropertyChanged - 是不是存在连接

无效的IObservable转换,运行时异常

如何创建一个表示其他两个可观察对象完成的可观察对象?

创建和订阅简单的观察队列

如何从 ShowDialog 返回 IObservable<DialogResult>