一个可作为 ReplaySubject 但仅适用于第一个订阅者的 Rx 可观察对象?
Posted
技术标签:
【中文标题】一个可作为 ReplaySubject 但仅适用于第一个订阅者的 Rx 可观察对象?【英文标题】:An Rx observable that would act as ReplaySubject but only for the 1st subscriber? 【发布时间】:2021-11-21 21:07:11 【问题描述】:构成类似于ReplaySubject
的Rx observable 的优雅方式是什么,但只发出累积序列一次 并且仅针对第一个订阅者(当该订阅者连接时)?第一次订阅后,它应该就像普通的Subject
。
这适用于 .NET 项目,但我同样感谢 javascript/RxJS 的答案。
我在谷歌上搜索了可能的解决方案,我即将推出自己的解决方案,最终类似于 how I approached DistinctSubject
。
【问题讨论】:
这里的用例是什么? @Enigmativity,我有一个案例,我缓冲了一些事件,当他们开始观察时,多个消费者可以为第一批比赛。所以它就像 ReplaySubject,但只对赢得比赛的人进行重播。这可能是我能描述它的最好方式。我想我可以用纯 TPL 解决它,但从学习的角度来看,我也对 Rx 解决方案感兴趣。 几个月前我问过类似的问题:How to make a lightweightReplay
operator that can be subscribed only once?
@TheodorZoulias,tks,让我想,也许这种情况并不那么奇特 :) 就我而言,我还需要确保其他订阅者将收到新的新事件,但不是在此之前第一个订阅者已观察到所有初始缓冲区(然后不再缓冲)。我会解决的,我有一个 TPL 解决方案,所以我不着急。
【参考方案1】:
我稍微修改了similar question中的实现,并将类的名称从ReplayOnceSubject
更改为ReplayFirstSubscriberOnlySubject
:
public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value) lock (_locker) _subject.OnNext(value);
public void OnError(Exception error) lock (_locker) _subject.OnError(error);
public void OnCompleted() lock (_locker) _subject.OnCompleted();
public IDisposable Subscribe(IObserver<T> observer)
if (observer == null) throw new ArgumentNullException(nameof(observer));
lock (_locker)
if (_subject is ReplaySubject<T> replaySubject)
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
replaySubject.Dispose();
_subject = subject;
return subscription;
else
return _subject.Subscribe(observer);
这可能不是最有效的解决方案,因为每次操作都会获取两个不同的 lock
s(_locker
和 internal _gate
),但它
也应该不会很糟糕。
【讨论】:
很好!唯一的事情是,锁将阻塞事件生产者(那些调用OnNext
),而缓冲区被播放给第一个订阅者。再说一次,ReplaySubject
behaves 也是如此。谢谢!
@noseratio 当我实现 ReplayOnceSubject
时,我从无锁实现开始 (Interlocked.CompareExchange
)。但事情并不顺利,我不记得原因了,所以我最终选择了值得信赖的lock
。 ?
顺便说一句,您知道吗?使用lock ...
会导致 STA 线程(WinForms、WPF 等)出现严重的重入问题。我曾经有一个心理画面,Monitor.Enter/Exit
是 Win32 关键部分的包装器,但事实证明它们不是:github.com/mono/SkiaSharp/issues/1383#issuecomment-927750884
@noseratio 我不知道具体的 STA 细微差别,但我非常清楚,如果我在 lock
受保护区域内 Cancel
或 CancellationTokenSource
或 TrySetResult
TaskCompletionSource
,很可能所有的地狱都会崩溃。 ?
@noseratio 顺便说一句,我仓促地实现了IDisposable
接口,没有检查如果在已处置的主题上调用Subscribe
会发生什么。如果您打算将这个类投入生产,您可能想看看它。以上是关于一个可作为 ReplaySubject 但仅适用于第一个订阅者的 Rx 可观察对象?的主要内容,如果未能解决你的问题,请参考以下文章
pipenv install --system 但仅适用于用户