创建一个 IObservable<T> 正确返回大量数据(异步?)
Posted
技术标签:
【中文标题】创建一个 IObservable<T> 正确返回大量数据(异步?)【英文标题】:Creating an IObservable<T> that returns properly a ton of data (asynchronously?) 【发布时间】:2021-12-14 22:13:39 【问题描述】:我对@987654321@ 不是很熟悉,但我正在使用的一个包以某种方式强加给我。
我预计会返回一个IObservable<T>
,稍后会在其上调用订阅,然后立即处理其结果。我的意图是用来自海量数据集的数据填充它。它从文本文件中逐行读取 GB 的数据。
但我似乎无法找到一个很好的例子来说明如何将我的一小时的 while 循环变成一个可观察的,并且它不希望预先读取所有数据。
我看到Observable.FromAsync
有一些变体,但任务也不是我的强项,似乎也无法让它发挥作用。
到目前为止,我取得的最好成绩如下。哪个编译并运行,但绝对什么都不做。据我所知,从不调用Create
中的代码。
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
return Observable.Create<Data>(async (subject, token) =>
try
FileStream stream = null;
StreamReader sr = null;
DateTime date = startDate;
string path = string.Format("C:\\MYPATH\\0-1-2.csv", date.Year,
date.Month.ToString("00"), date.Day.ToString("00"));
while (date < endDate)
if (!File.Exists(path))
date.AddDays(1);
continue;
stream = File.Open(path, FileMode.Create, FileAccess.Read);
sr = new StreamReader(stream);
while (!sr.EndOfStream)
string line = await sr.ReadLineAsync();
Data d = ParseData(line);
subject.OnNext(d);
if (stream != null)
sr.Close();
stream.Close();
catch (Exception ex)
try
subject.OnError(ex);
catch (Exception)
Console.WriteLine("An exception was thrown while trying to call" +
" OnError on the observable subject -- means you're not" +
" catching exceptions");
throw;
).Publish();
我什至不确定我想做的事情在技术上是否可行,因为我不确定 Observable 模式是如何工作的。但是由于上下文的原因,它似乎期望服务器连接通常会为其提供DataStream
。所以我认为这是可以做到的。但只有正确组合 Observable 创建方法。
如果有人有一些好的文档可供阅读,以对初学者友好的方式解释这一点,那也很好。
根据要求,下面如何调用该方法,但它主要进入黑盒库。
IObservable<Data> data = GetHistoricalData(
new DateTime(2021, 1, 1, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2021, 1, 5, 0, 0, 0, DateTimeKind.Utc));
// Build charts from data
IObservable<(Data, Chart)> dataWithChart = data.GenerateCharts(TimeFrame);
// Generate signals from the charts
IObservable<(Signal, Chart)> signalsWithChart = dataWithChart.GenerateSignals(
Symbol, strategy);
// We're interested in the signals only
IObservable<Signal> signals = signalsWithChart.SelectSignals();
// Show information from each signal
IDisposable subscription = signals.Subscribe(ShowSignal);
subscription.Dispose();
【问题讨论】:
根据我的经验,使用Observable.Create
几乎总是错误的方式。它会创建可能导致竞争条件的可观察对象,并且您会陷入程序性而非功能性术语的思考中。
您能否也包括如何调用GetHistoricalData
方法?是这样使用的吗? var subscription = GetHistoricalData(date1, date2); subscription.Dispose();
@TheodorZoulias 按要求添加了它,但我最初并没有发布它,因为我认为它没有增加太多价值,因为它通过了 5 次黑匣子。
嗯,所以您说GenerateCharts
、GenerateSignals
、SelectSignals
和ShowSignal
方法都不受您控制?您不能修改这些方法中的任何一个吗?你至少有这些方法的源代码吗?
如果我进行逆向工程,是的。否则,我真的无能为力。
【参考方案1】:
我认为你应该阅读"Introduction to Rx",尤其是关于冷热可观察的部分,以及publish and connect。
您的代码存在几个较小的问题。
我为您的代码制作了一个更简单的版本,在我删除了对.Publish()
的调用后它就可以工作了。而且我相当肯定你不想在这里。
Publish
制作了一个支持多个观察者的包装器。您可以使用Publish()
使其适用于多个观察者,然后在所有观察者都订阅后调用Connect
。但发布更适合“热”流,例如鼠标/键盘事件。您的数据只会被读取一次。在您调用 Connect
之后连接的任何 observable 都不会获得已经读取的数据。如果你需要这样的多个订阅者,我会返回IConnectableObservable
而不是IObservable
,这样一旦所有观察者都订阅了,你就可以Connect()
。
至于你的代码:
保持简单。每当您使用流时,请使用using
,除非您非常清楚为什么不应该这样做。
计算循环内的路径。使用字符串插值代替string.Format()
。
最后拨打subject.OnCompleted()
。
重新分配date
变量。 DateTime
c# 中的值是不可变的。 date.AddDays()
不会修改date
,而是返回一个新的DateTime
。
使用您真正需要的文件模式。如果您不打算在文件不存在的情况下调用代码,则不需要 .Create。
这对我有用:
public IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
return Observable.Create<Data>(async (subject, token) =>
var date = startDate;
try
while (date < endDate)
string path = $@"C:\MYPATH\date.Year-date.Month:00-date.Day:00.csv";
if (File.Exists(path))
using (var stream = File.Open(path, FileMode.Open, FileAccess.Read))
using (var sr = new StreamReader(stream))
while (!sr.EndOfStream)
var line = await sr.ReadLineAsync();
if (!string.IsNullOrWhiteSpace(line))
var data = ParseData(line);
subject.OnNext(data);
date = date.AddDays(1);
catch (Exception e)
subject.OnError(e);
subject.OnCompleted();
);
【讨论】:
你需要在OnError
之后退出while
。出错后无法继续循环。
好吧,至少,它现在正在工作,并且正在处理该方法。它在从第一个文件中读取了 100 行之后就会出错,但这可能不完全是这段代码的错。感谢您指出菜鸟的错误,我现在完全没有专注于这一点(我也忘记在 EOF 被击中后添加一天)我会在您提供的链接中阅读更多内容,然后从那里开始。再次感谢!
当然,我需要确保我的 Main 方法一直运行,直到所有文件处理完成。现在它工作正常。
感谢@Enigmativity,这是在将我的测试代码移动到 SO 时搞砸的。【参考方案2】:
您的代码似乎被一个滥用IObservable<T>
monad 的包所使用,将其视为IEnumerable<T>
。当某人Subscribe
s 到一个可观察序列然后立即取消订阅时,他们只会观察订阅期间同步推送的通知。这相当于同步枚举IEnumerable<T>
。所以我的建议是通过编写基于IEnumerable<T>
的代码来简化你的生活,然后在使用ToObservable
Rx 运算符将其转换为可观察后将序列传递给包:
public static IEnumerable<Data> GetHistoricalData(DateTime startDate, DateTime endDate)
//...
foreach (var line in File.ReadLines(path))
yield return ParseData(line);
//...
从可枚举到可观察的转换:
IObservable<Data> data = GetHistoricalData(date1, date2).ToObservable();
【讨论】:
这是否意味着我必须将整个数据集保存在内存中才能使其可观察?对于多 GB 大小的数据集,这将是一种令人望而却步的方法。虽然我确实同意这种可观察到的东西感觉有点强迫。 @Smileynator 我不这么认为。您一次从文件中读取一行,然后解析它,然后yield
解析后的 Data
。然后将该数据推送到包提供的观察者。包对它们做任何它想做的事,然后控制权返回给你,从文件中读取下一行。当包调用Subscribe
方法时,一切都会发生。当Subscribe
返回时,一切都会完成。处理 IDisposable
将是无操作的。
@Smileynator 如果你想解析下一段数据,而包对前一段进行处理,换句话说,如果你想引入并行性,你可以考虑使用OffloadEnumeration
我在this 答案底部附近发布的方法。你可以这样使用它:IObservable<Data> data = GetHistoricalData(date1, date2).OffloadEnumeration().ToObservable();
【参考方案3】:
给你。一个很好的查询:
public static IObservable<Data> GetHistoricalData(DateTime startDate, DateTime endDate) =>
from n in Observable.Range(0, int.MaxValue)
let d = startDate.AddDays(n)
where d < endDate
let path = $"C:\\MYPATH\\d.ToString("yyyy-MM-dd").csv"
where File.Exists(path)
from l in Observable.Using(
() => File.Open(path, FileMode.Open, FileAccess.Read),
s => Observable.Using(
() => new StreamReader(s),
sr => Observable.While(
() => !sr.EndOfStream,
Observable.Defer(
() => Observable.FromAsync(
() => sr.ReadLineAsync())))))
select ParseData(l);
【讨论】:
以上是关于创建一个 IObservable<T> 正确返回大量数据(异步?)的主要内容,如果未能解决你的问题,请参考以下文章
是否可以将 IObservable<T> 转换为 IAsyncEnumerable<T>