是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?

Posted

技术标签:

【中文标题】是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?【英文标题】:Is it possible to use Rx "Using" operator with IAsyncDisposable? 【发布时间】:2021-12-16 07:45:31 【问题描述】:

是否可以将 Rx.Net 中的 Using 运算符与实现 IAsyncDisposable 而不是 IDisposable 的资源一起使用?如果没有,我可以使用某种解决方法吗?

【问题讨论】:

【参考方案1】:

这是一个适用于IAsyncDisposable 对象的Using 方法:

/// <summary>
/// Constructs an observable sequence that depends on a resource object,
/// whose lifetime is tied to the resulting observable sequence's lifetime.
/// </summary>
public static IObservable<TResult> Using<TResult, TResource>(
    Func<TResource> resourceFactory,
    Func<TResource, IObservable<TResult>> observableFactory)
    where TResource : IAsyncDisposable

    return Observable.Defer(() =>
    
        TResource resource = resourceFactory();
        IObservable<TResult> observable;
        try  observable = observableFactory(resource); 
        catch (Exception ex)  observable = Observable.Throw<TResult>(ex); 

        Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
        IObservable<TResult> disposer = Observable
            .FromAsync(() => lazyDisposeTask.Value)
            .Select(_ => default(TResult))
            .IgnoreElements();

        return observable
            .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
            .Concat(disposer)
            .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult());
    );

此方法与 Rx Observable.Using 方法具有相同的签名(where 子句除外),并且可以以相同的方式使用。

此实现处理所有完成情况:

    成功完成:IAsyncDisposable 资源由 Concat 运算符异步处理。 完成错误:IAsyncDisposable 资源由 Catch 运算符异步处理。 序列在完成之前被取消订阅:IAsyncDisposable 资源由Finally 运算符同步处理。在这种情况下,无法异步处理资源,原因已解释为 here。

带有异步工厂方法的变体:

public static IObservable<TResult> Using<TResult, TResource>(
    Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
    Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
    where TResource : IAsyncDisposable

    return Observable.Create<TResult>(async (observer, cancellationToken) =>
    
        TResource resource = await resourceFactoryAsync(cancellationToken);
        IObservable<TResult> observable;
        try  observable = await observableFactoryAsync(resource, cancellationToken); 
        catch  await resource.DisposeAsync(); throw; 

        Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
        IObservable<TResult> disposer = Observable
            .FromAsync(() => lazyDisposeTask.Value)
            .Select(_ => default(TResult))
            .IgnoreElements();

        return observable
            .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
            .Concat(disposer)
            .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult())
            .Subscribe(observer);
    );

【讨论】:

@jackdry 当然,我更新了答案。 非常感谢。出于兴趣,是否有充分的理由为“lazyDisposeTask”使用Lazy&lt;Task&gt; 而不是Func&lt;Task&gt; @jackdry 是的。 Lazy&lt;Task&gt; 确保 DisposeAsync 只会被调用一次。大多数现实世界的一次性用品都可以容忍多次处理,但最好是安全而不是抱歉。 :-) @jackdry 仅供参考,我修复了两个 Using 方法的实现中的错误。 @jackdry - ***.com/questions/17284517/…

以上是关于是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2 - 将 Mono 转换为 rx.Observable?

单片机可以将UART的Tx与Rx直接相连吗

在 Adapter for Place API 的 Filter.performFiltering() 中使用 Rx debounce 运算符

rx(rxjs)是来自事件运算符多播吗?

Rx.Net 运算符忽略某些值,直到它们在一定时间内相同

如何实现模块与单片机的RX和TX连接并让它与PC机连接,就是实现TX和RX的分时复用功能