是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?
Posted
技术标签:
【中文标题】是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?【英文标题】:Is it possible to use Rx "Using" operator with IAsyncDisposable? 【发布时间】:2021-12-27 10:43:30 【问题描述】:是否可以将 Rx.Net 中的 Using 运算符与实现 IAsyncDisposable
而不是 IDisposable
的资源一起使用?如果没有,我可以使用某种解决方法吗?
【问题讨论】:
【参考方案1】:这是一个适用于IAsyncDisposable
对象的Using
方法:
/// <summary>
/// Constructs an observable sequence that depends on a resource object,
/// whose lifetime is tied to the resulting observable sequence's lifetime.
/// </summary>
public static IObservable<TResult> Using<TResult, TResource>(
Func<TResource> resourceFactory,
Func<TResource, IObservable<TResult>> observableFactory)
where TResource : IAsyncDisposable
return Observable.Defer(() =>
TResource resource = resourceFactory();
IObservable<TResult> observable;
try observable = observableFactory(resource);
catch (Exception ex) observable = Observable.Throw<TResult>(ex);
Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
IObservable<TResult> disposer = Observable
.FromAsync(() => lazyDisposeTask.Value)
.Select(_ => default(TResult))
.IgnoreElements();
return observable
.Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
.Concat(disposer)
.Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult());
);
此方法与 Rx Observable.Using
方法具有相同的签名(where
子句除外),并且可以以相同的方式使用。
此实现处理所有完成情况:
-
成功完成:
IAsyncDisposable
资源由 Concat
运算符异步处理。
完成错误:IAsyncDisposable
资源由 Catch
运算符异步处理。
序列在完成之前被取消订阅:IAsyncDisposable
资源由Finally
运算符同步处理。在这种情况下,无法异步处理资源,原因已解释为 here。
带有异步工厂方法的变体:
public static IObservable<TResult> Using<TResult, TResource>(
Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
where TResource : IAsyncDisposable
return Observable.Create<TResult>(async (observer, cancellationToken) =>
TResource resource = await resourceFactoryAsync(cancellationToken);
IObservable<TResult> observable;
try observable = await observableFactoryAsync(resource, cancellationToken);
catch await resource.DisposeAsync(); throw;
Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
IObservable<TResult> disposer = Observable
.FromAsync(() => lazyDisposeTask.Value)
.Select(_ => default(TResult))
.IgnoreElements();
return observable
.Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
.Concat(disposer)
.Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult())
.Subscribe(observer);
);
【讨论】:
@jackdry 当然,我更新了答案。 非常感谢。出于兴趣,是否有充分的理由为“lazyDisposeTask”使用Lazy<Task>
而不是Func<Task>
?
@jackdry 是的。 Lazy<Task>
确保 DisposeAsync
只会被调用一次。大多数现实世界的一次性用品都可以容忍多次处理,但最好是安全而不是抱歉。 :-)
@jackdry 仅供参考我修复了两个 Using
方法的实现中的错误。
@jackdry - ***.com/questions/17284517/…以上是关于是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot 2 - 将 Mono 转换为 rx.Observable?
在 Adapter for Place API 的 Filter.performFiltering() 中使用 Rx debounce 运算符