Rx.NET 中是不是存在功能类似于 BehaviorSubject 但仅在值发生更改时才发出的 Subject 实现?
Posted
技术标签:
【中文标题】Rx.NET 中是不是存在功能类似于 BehaviorSubject 但仅在值发生更改时才发出的 Subject 实现?【英文标题】:Is there a Subject implementation in Rx.NET that functionally resembles BehaviorSubject but emits only if the value has changed?Rx.NET 中是否存在功能类似于 BehaviorSubject 但仅在值发生更改时才发出的 Subject 实现? 【发布时间】:2021-11-13 06:03:40 【问题描述】:Rx.NET 中是否有 Subject
实现在功能上类似于 BehaviorSubject
但只有在它发生变化时才会发出下一个值?
我对 Reactive Extensions 比较陌生,我似乎找不到类似的东西,尽管这种模式感觉像是 INotifyPropertyChanged
的自然替代品。
我的幼稚实现是封装BehaviorSubject<T>
,如下所示。与使用 Observable.DistinctUntilChanged
创建可组合的 observable 相比,这有什么缺点吗?
public class DistinctSubject<T> : SubjectBase<T>
private readonly BehaviorSubject<T> _subject;
public DistinctSubject(T initialValue) =>
_subject = new BehaviorSubject<T>(initialValue);
public T Value
get => _subject.Value;
set => this.OnNext(value);
public override bool HasObservers => _subject.HasObservers;
public override bool IsDisposed => _subject.IsDisposed;
public override void Dispose() => _subject.Dispose();
public override void OnCompleted() => _subject.OnCompleted();
public override void OnError(Exception error) => _subject.OnError(error);
public override void OnNext(T value)
if (!EqualityComparer<T>.Default.Equals(value, _subject.Value))
_subject.OnNext(value);
public override IDisposable Subscribe(IObserver<T> observer) =>
_subject.Subscribe(observer);
【问题讨论】:
不要实现自己的IObservable
、IObserver
或 ISubject
对象。你很可能会弄错它们并导致自己悲伤。您永远不想将主题的观察者部分暴露给外界。这就是为什么有AsObservable()
运算符的原因。如果你让外部代码调用 OnCompleted
你的 observable 就死掉了。
@Enigmativity,来自the thread,您指出:“主题是 Rx 的有状态组件。当您需要创建类似事件的 observable 作为字段或局部变量”。我认为这正是我在这里想要做的,我的应用程序主要状态的基本可观察存储。在某种程度上,它类似于 React 人员对 Recoil's atoms 所做的事情。
@noseratio - 它们对此很有用,但仅仅因为它们对某些东西有用并不能否定关于不使用它们的警告。您需要了解陷阱是什么。
@noseratio - Theo 的实现非常好,但它也应该像内部字段一样实现IDisposable
。
@noseratio - 请不要编辑问题以显示答案。添加您自己的答案。
【参考方案1】:
看了一眼BehaviorSubject<T>
类的source code 之后,如果OnError
后跟OnNext
,您的DistinctSubject<T>
实现的行为似乎会有所不同:
var subject = new DistinctSubject<int>(2021);
subject.OnError(new ApplicationException());
subject.OnNext(2022); // throws ApplicationException
这会抛出,而对BehaviorSubject<T>
执行相同操作不会抛出(OnNext
会被忽略)。
我的建议是在实现中使用DistinctUntilChanged
运算符,如下所示:
public class DistinctSubject<T> : ISubject<T>, IDisposable
private readonly BehaviorSubject<T> _subject;
private readonly IObservable<T> _distinctUntilChanged;
public DistinctSubject(T initialValue, IEqualityComparer<T> comparer = default)
_subject = new BehaviorSubject<T>(initialValue);
_distinctUntilChanged = _subject.DistinctUntilChanged(
comparer ?? EqualityComparer<T>.Default);
public T Value => _subject.Value;
public void OnNext(T value) => _subject.OnNext(value);
public void OnError(Exception error) => _subject.OnError(error);
public void OnCompleted() => _subject.OnCompleted();
public IDisposable Subscribe(IObserver<T> observer) =>
_distinctUntilChanged.Subscribe(observer);
public void Dispose() => _subject.Dispose();
如果您担心对象的不必要分配,那么您还没有熟悉 Rx 的精神。这个库是关于功能和易用性的,而不是关于performance 或效率!
【讨论】:
另一件事,我认为我的转发实现在比较OnNext(T value)
中的新旧值时存在竞争条件,而DistinctUntilChanged
可能没有。这对我来说现在不是问题,因为这一切都发生在主线程上,但这是 DistinctUntilChanged
的另一个优点。
不错的实现,但有一件小事——因为BehaviorSubject<T>
实现了IDisposable
,那么这个类也应该如此。
是的,我心想,我要用 observables 替换应用程序的大部分状态属性,反正我有什么损失! ? 但话又说回来,它有助于理解边缘情况,比如订阅一个已经停止或以错误结束的主题。
@TheodorZoulias 恕我直言,这些天来这些规则变得更加宽松?很多人(包括我在内)使用IDisposable
来确定@987654339@ 的范围,就像C# 中的lock
模式一样。通常它是在 .NET 本身中烘焙的,例如TransactionScope
.
@noseratio 既然您遇到了从头开始实现DistinctSubject<T>
的麻烦,您不妨删除所有线程同步功能(_lock
字段),以避免减慢您的单-线程应用程序。当单个线程完成所有工作时,必须在每个操作上使用 lock
纯属开销。以上是关于Rx.NET 中是不是存在功能类似于 BehaviorSubject 但仅在值发生更改时才发出的 Subject 实现?的主要内容,如果未能解决你的问题,请参考以下文章
Edge 浏览器是不是有与 NoScript 功能类似的插件?
在 TFS2017 中是不是有类似于 Github 发布的功能?
javascript中是不是有类似于php中的compact的功能?
Omnipay PayPal Express 中是不是存在信用卡功能?还是仅在 PayPal Pro 中?