一个可作为 ReplaySubject 但仅适用于第一个订阅者的 Rx 可观察对象?

Posted

技术标签:

【中文标题】一个可作为 ReplaySubject 但仅适用于第一个订阅者的 Rx 可观察对象?【英文标题】:An Rx observable that would act as ReplaySubject but only for the 1st subscriber? 【发布时间】:2021-11-21 21:07:11 【问题描述】:

构成类似于ReplaySubject 的Rx observable 的优雅方式是什么,但只发出累积序列一次 并且仅针对第一个订阅者(当该订阅者连接时)?第一次订阅后,它应该就像普通的Subject

这适用于 .NET 项目,但我同样感谢 javascript/RxJS 的答案。

我在谷歌上搜索了可能的解决方案,我即将推出自己的解决方案,最终类似于 how I approached DistinctSubject

【问题讨论】:

这里的用例是什么? @Enigmativity,我有一个案例,我缓冲了一些事件,当他们开始观察时,多个消费者可以为第一批比赛。所以它就像 ReplaySubject,但只对赢得比赛的人进行重播。这可能是我能描述它的最好方式。我想我可以用纯 TPL 解决它,但从学习的角度来看,我也对 Rx 解决方案感兴趣。 几个月前我问过类似的问题:How to make a lightweight Replay operator that can be subscribed only once? @TheodorZoulias,tks,让我想,也许这种情况并不那么奇特 :) 就我而言,我还需要确保其他订阅者将收到新的新事件,但不是在此之前第一个订阅者已观察到所有初始缓冲区(然后不再缓冲)。我会解决的,我有一个 TPL 解决方案,所以我不着急。 【参考方案1】:

我稍微修改了similar question中的实现,并将类的名称从ReplayOnceSubject更改为ReplayFirstSubscriberOnlySubject

public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>

    private readonly object _locker = new object();
    private ISubject<T> _subject = new ReplaySubject<T>();

    public void OnNext(T value)  lock (_locker) _subject.OnNext(value); 
    public void OnError(Exception error)  lock (_locker) _subject.OnError(error); 
    public void OnCompleted()  lock (_locker) _subject.OnCompleted(); 

    public IDisposable Subscribe(IObserver<T> observer)
    
        if (observer == null) throw new ArgumentNullException(nameof(observer));
        lock (_locker)
        
            if (_subject is ReplaySubject<T> replaySubject)
            
                var subject = new Subject<T>();
                var subscription = subject.Subscribe(observer);
                // Now replay the buffered notifications
                replaySubject.Subscribe(subject).Dispose();
                replaySubject.Dispose();
                _subject = subject;
                return subscription;
            
            else
                return _subject.Subscribe(observer);
        
    

这可能不是最有效的解决方案,因为每次操作都会获取两个不同的 locks(_locker 和 internal _gate),但它 也应该不会很糟糕。

【讨论】:

很好!唯一的事情是,锁将阻塞事件生产者(那些调用OnNext),而缓冲区被播放给第一个订阅者。再说一次,ReplaySubject behaves 也是如此。谢谢! @noseratio 当我实现 ReplayOnceSubject 时,我从无锁实现开始 (Interlocked.CompareExchange)。但事情并不顺利,我不记得原因了,所以我最终选择了值得信赖的lock。 ? 顺便说一句,您知道吗?使用lock ... 会导致 STA 线程(WinForms、WPF 等)出现严重的重入问题。我曾经有一个心理画面,Monitor.Enter/Exit 是 Win32 关键部分的包装器,但事实证明它们不是:github.com/mono/SkiaSharp/issues/1383#issuecomment-927750884 @noseratio 我不知道具体的 STA 细微差别,但我非常清楚,如果我在 lock 受保护区域内 CancelCancellationTokenSourceTrySetResult TaskCompletionSource ,很可能所有的地狱都会崩溃。 ? @noseratio 顺便说一句,我仓促地实现了IDisposable 接口,没有检查如果在已处置的主题上调用Subscribe 会发生什么。如果您打算将这个类投入生产,您可能想看看它。

以上是关于一个可作为 ReplaySubject 但仅适用于第一个订阅者的 Rx 可观察对象?的主要内容,如果未能解决你的问题,请参考以下文章

pipenv install --system 但仅适用于用户

Mpdf - 全页大小的图像,但仅适用于单页

3 个最新值,但仅适用于特定用户

如何将程序与文件类型关联,但仅适用于当前用户?

Web 请求失败,但仅适用于 localhost 和使用 System.Net.HttpWebRequest

Excel 导出日期格式出错,但仅适用于某些日期值