为啥在 .NET 响应式扩展中不推荐主题?
Posted
技术标签:
【中文标题】为啥在 .NET 响应式扩展中不推荐主题?【英文标题】:Why are Subjects not recommended in .NET Reactive Extensions?为什么在 .NET 响应式扩展中不推荐主题? 【发布时间】:2013-01-02 00:24:58 【问题描述】:我目前正在掌握 .NET 的响应式扩展框架,并且正在通过我找到的各种介绍资源(主要是 http://www.introtorx.com)进行工作
我们的应用程序涉及许多检测网络帧的硬件接口,这些将是我的 IObservables,然后我有各种组件将消耗这些帧或对数据执行某种方式的转换并产生一种新类型的帧.例如,还会有其他组件需要每隔 n 帧显示一次。 我确信 Rx 将对我们的应用程序有用,但是我正在努力解决 IObserver 接口的实现细节。
我读过的大部分(如果不是全部)资源都说我不应该自己实现 IObservable 接口,而是使用提供的函数或类之一。
根据我的研究,创建Subject<IBaseFrame>
似乎可以满足我的需求,我将拥有从硬件接口读取数据的单线程,然后调用我的Subject<IBaseFrame>
实例的 OnNext 函数。然后,不同的 IObserver 组件将收到来自该主题的通知。
我的困惑来自this tutorial 附录中的建议:
避免使用主题类型。 Rx 实际上是一种函数式编程范式。使用主题意味着我们现在正在管理状态,这可能会发生变化。同时处理变异状态和异步编程是非常困难的。此外,许多运算符(扩展方法)都经过精心编写,以确保订阅和序列的正确和一致的生命周期得以维持;当你介绍主题时,你可以打破这个。如果您明确使用主题,未来版本的性能也可能会显着下降。
我的应用程序对性能非常关键,我显然会在使用 Rx 模式之前测试它的性能,然后再将其用于生产代码;但是我担心我正在使用 Subject 类做一些违背 Rx 框架精神的事情,并且该框架的未来版本会损害性能。
有没有更好的方法来做我想做的事?无论是否有任何观察者,硬件轮询线程都将持续运行(否则硬件缓冲区将备份),因此这是一个非常热的序列。然后我需要将接收到的帧传递给多个观察者。
任何建议将不胜感激。
【问题讨论】:
它确实帮助我理解了这个主题,我只是想清楚如何在我的应用程序中使用它。我知道它们是正确的——我有一个非常面向推送的组件管道,我需要在 UI 线程上进行各种过滤和调用以在 GUI 中显示以及缓冲最后接收到的帧等等等 - 我只需要确保我第一次就做对了! 【参考方案1】:好的, 如果我们忽略我的教条方式并忽略“主题是好/坏”。让我们看看问题空间。
我敢打赌,您需要适应 2 种系统风格中的 1 种。
-
系统会在消息到达时引发事件或回调
您需要轮询系统以查看是否有任何消息要处理
对于选项 1,很简单,我们只需使用适当的 FromEvent 方法将其包装起来,就完成了。去酒吧!
对于选项 2,我们现在需要考虑如何轮询以及如何有效地执行此操作。另外,当我们获得价值时,我们如何发布它?
我想你会想要一个专用线程来进行轮询。您不希望其他编码人员敲打 ThreadPool/TaskPool 并让您处于 ThreadPool 饥饿的境地。或者,您不想要上下文切换的麻烦(我猜)。所以假设我们有自己的线程,我们可能会有某种 While/Sleep 循环,我们坐在那里进行轮询。当检查发现一些消息时,我们会发布它们。好吧,所有这些听起来都非常适合 Observable.Create。现在我们可能不能使用 While 循环,因为它不允许我们返回 Disposable 以允许取消。幸运的是,您已经阅读了整本书,因此对递归调度很了解!
我想这样的事情可能会奏效。 #未测试
public class MessageListener
private readonly IObservable<IMessage> _messages;
private readonly IScheduler _scheduler;
public MessageListener()
_scheduler = new EventLoopScheduler();
var messages = ListenToMessages()
.SubscribeOn(_scheduler)
.Publish();
_messages = messages;
messages.Connect();
public IObservable<IMessage> Messages
get return _messages;
private IObservable<IMessage> ListenToMessages()
return Observable.Create<IMessage>(o=>
return _scheduler.Schedule(recurse=>
try
var messages = GetMessages();
foreach (var msg in messages)
o.OnNext(msg);
recurse();
catch (Exception ex)
o.OnError(ex);
);
);
private IEnumerable<IMessage> GetMessages()
//Do some work here that gets messages from a queue,
// file system, database or other system that cant push
// new data at us.
//
//This may return an empty result when no new data is found.
我真的不喜欢主题的原因是,这通常是开发人员对问题没有真正清晰设计的情况。攻入一个主题,到处戳它,然后让可怜的支持开发者猜测 WTF 正在继续。当您使用 Create/Generate 等方法时,您正在本地化对序列的影响。您可以通过一种方法查看所有内容,并且您知道没有其他人会产生令人讨厌的副作用。如果我看到一个主题字段,我现在必须去寻找一个班级中所有正在使用它的地方。如果某个 MFer 公开暴露了一个,那么所有的赌注都没有了,谁知道这个序列是如何使用的! 异步/并发/接收很难。您无需让副作用和因果关系编程更让您头疼。
【讨论】:
我现在正在阅读这个答案,但我觉得我应该指出我永远不会考虑公开主题界面!我正在使用它在密封类中提供 IObservable 实现(公开 IObservable)。我肯定明白为什么暴露 Subject 界面会是一件坏事™ 嘿,很抱歉,我只是不太了解您的代码。 ListenToMessages() 和 GetMessages() 在做什么和返回? 对于您的个人项目@jeromerg,这可能没问题。然而,根据我的经验,开发人员在 WPF、MVVM、单元测试 GUI 设计中苦苦挣扎,然后投入 Rx 会使事情变得更加复杂。我已经尝试过 BehaviourSubject-as-a-property 模式。但是我发现如果我们使用标准的 INPC 属性,然后使用简单的扩展方法将其转换为 IObservable,它对其他人来说更容易采用。此外,您将需要自定义 WPF 绑定来处理您的行为主题。现在你可怜的团队也必须学习 WPF、MVVM、Rx 和你的新框架。 @LeeCampbell,就您的代码示例而言,通常的方式是 MessageListener 由系统构造(您可能以某种方式注册了类名),并且您被告知然后系统将调用 OnCreate() 和 OnGoodbye(),并在生成消息时调用 message1()、message2() 和 message3()。似乎 messageX[123] 会在主题上调用 OnNext,但有更好的方法吗? @JamesMoore 因为这些事情用具体的例子更容易解释。如果您知道使用 Rx 和 Subjects 的开源 android 应用程序,那么也许我可以找时间看看我是否可以提供更好的方法。我明白站在基座上说科目不好并不是很有帮助。但我认为像 IntroToRx、RxCookbook 和 ReactiveTrader 这样的东西都给出了如何使用 Rx 的不同层次的例子。【参考方案2】:一般而言,您应该避免使用Subject
,但是对于您在这里所做的事情,我认为它们工作得很好。当我在 Rx 教程中遇到“避免主题”消息时,我询问了 similar question。
引用Dave Sexton(Rxx)
“主题是 Rx 的有状态组件。它们在以下情况下很有用 您需要创建一个类似事件的可观察对象作为字段或本地 变量。”
我倾向于将它们用作 Rx 的入口点。因此,如果我有一些代码需要说“发生了一些事情”(就像你有的那样),我会使用Subject
并调用OnNext
。然后将其公开为IObservable
以供其他人订阅(您可以在您的主题上使用AsObservable()
以确保没有人可以投射到主题并搞砸事情)。
您也可以使用 .NET 事件并使用 FromEventPattern
来实现此目的,但如果我只是要将事件转换为 IObservable
,我看不出有事件而不是有什么好处Subject
(这可能意味着我在这里遗漏了一些东西)
但是,您应该强烈避免使用Subject
订阅IObservable
,即不要将Subject
传递给IObservable.Subscribe
方法。
【讨论】:
为什么需要状态?如我的回答所示,如果您将问题分解为单独的部分,则您根本不需要管理状态。在这种情况下不应使用主题。 @casperOne 您不需要 Subject当你管理一个主题时,你实际上只是在重新实现 Rx 中已经存在的功能,而且可能不是以一种健壮、简单和可扩展的方式。
当您尝试将一些异步数据流调整到 Rx(或从当前非异步的数据流创建异步数据流)时,最常见的情况通常是:
数据源是一个事件:正如 Lee 所说,这是最简单的情况:使用 FromEvent 并前往酒吧。
数据源来自同步操作,您需要轮询更新,(例如 web 服务或数据库调用):在这种情况下,您可以使用 Lee 建议的方法,或者简单在这种情况下,您可以使用 Observable.Interval.Select(_ => <db fetch>)
之类的东西。您可能希望使用 DistinctUntilChanged() 来防止在源数据中没有任何更改时发布更新。
数据源是某种异步 api 调用您的回调:在这种情况下,使用 Observable.Create 连接您的回调以调用 OnNext/OnError/OnComplete观察者。
数据的来源是一个阻塞直到有新数据可用的调用(例如一些同步套接字读取操作):在这种情况下,你可以使用 Observable.Create 来包装命令式从套接字读取并在读取数据时发布到 Observer.OnNext 的代码。这可能与您对主题所做的类似。
使用 Observable.Create 与创建一个管理 Subject 的类相当等同于使用 yield 关键字与创建一个实现 IEnumerator 的整个类。当然,你可以把 IEnumerator 写得像 yield 代码一样干净和好公民,但是哪个封装更好,感觉设计更整洁呢? Observable.Create 与管理 Subjects 也是如此。
Observable.Create 为您提供了一个干净的模式,用于惰性设置和干净的拆卸。您如何通过包装主题的类来实现这一点?你需要某种 Start 方法......你怎么知道什么时候调用它?还是你总是启动它,即使没有人在听?完成后,如何让它停止从套接字读取/轮询数据库等?你必须有某种 Stop 方法,而且你不仅要访问你订阅的 IObservable,还要访问最初创建 Subject 的类。
使用 Observable.Create,一切都集中在一个地方。 Observable.Create 的主体在有人订阅之前不会运行,所以如果没有人订阅,你就永远不会使用你的资源。并且 Observable.Create 返回一个 Disposable 可以干净地关闭您的资源/回调等 - 当观察者取消订阅时调用它。您用于生成 Observable 的资源的生命周期与 Observable 本身的生命周期紧密相关。
【讨论】:
对 Observable.Create 的解释非常清楚。谢谢! 我仍然有使用主题的情况,其中代理对象公开可观察对象(比如说它只是一个可变属性)。不同的组件将调用代理,告知该属性何时更改(通过方法调用),并且该方法执行 OnNext。消费者订阅。我想我会在这种情况下使用 BehaviorSubject,这样合适吗? 视情况而定。良好的 Rx 设计倾向于将系统转变为异步/反应式架构。很难将响应式代码的小组件与命令式设计的系统干净地集成在一起。创可贴解决方案是使用主题将命令式操作(函数调用、属性集)转换为可观察事件。然后你会得到一小部分反应性代码,没有真正的“啊哈!”片刻。将设计更改为对数据流进行建模并对其做出反应通常会带来更好的设计,但这是一种普遍的变化,需要思维方式的转变和团队的支持。 我会在此声明(作为没有经验的 Rx):通过使用主题,您可以在一个不断增长的命令式应用程序中进入 Rx 的世界并慢慢对其进行改造。也为了获得第一次经验....当然后来你的代码从一开始就应该是这样的(大声笑)。但首先我认为使用主题可能是值得的。【参考方案4】:引用的块文本几乎解释了为什么你不应该使用Subject<T>
,但更简单地说,你正在结合观察者和可观察者的功能,同时在两者之间注入某种状态(无论你是封装或扩展)。
这就是你遇到麻烦的地方;这些职责应该是相互独立的。
也就是说,在您具体的情况下,我建议您将您的顾虑分解成更小的部分。
首先,您的线程很热,并且始终监视硬件以获取发出通知的信号。你通常会怎么做? Events。所以让我们开始吧。
让我们定义您的事件将触发的EventArgs
。
// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
public BaseFrameEventArgs(IBaseFrame baseFrame)
// Validate parameters.
if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");
// Set values.
BaseFrame = baseFrame;
// Poor man's immutability.
public IBaseFrame BaseFrame get; private set;
现在,将触发事件的类。请注意,这可能是一个静态类(因为您总是有一个线程在运行监视硬件缓冲区),或者您按需调用的订阅 that 的东西。您必须根据需要对其进行修改。
public class BaseFrameMonitor
// You want to make this access thread safe
public event EventHandler<BaseFrameEventArgs> HardwareEvent;
public BaseFrameMonitor()
// Create/subscribe to your thread that
// drains hardware signals.
所以现在你有了一个公开事件的类。 Observables 可以很好地处理事件。如果您遵循标准事件模式,则可以通过Observable
class 上的static FromEventPattern
method 将事件流(将事件流视为事件的多次触发)转换为IObservable<T>
实现的一流支持。 .
使用您的事件源和FromEventPattern
方法,我们可以轻松创建IObservable<EventPattern<BaseFrameEventArgs>>
(EventPattern<TEventArgs>
class 体现了您在.NET 事件中看到的内容,特别是从@987654339 派生的实例@ 和一个代表发送者的对象),像这样:
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();
// Create the observable. It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);
当然,您想要一个 IObservable<IBaseFrame>
,但这很简单,使用 Observable
类上的 Select
extension method 创建一个投影(就像您在 LINQ 中所做的那样,我们可以将所有这些包装在一个易于使用的方法):
public IObservable<IBaseFrame> CreateHardwareObservable()
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();
// Create the observable. It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);
// Return the observable, but projected.
return observable.Select(i => i.EventArgs.BaseFrame);
【讨论】:
感谢您的回复@casperOne,这是我最初的方法,但添加一个事件以便我可以用 Rx 包装它感觉“错误”。我目前使用委托(是的,我知道这正是事件的含义!)以适应用于加载和保存配置的代码,这必须能够重建组件管道并且委托系统给了我最多灵活性。 Rx 现在在这方面让我头疼,但框架中其他所有功能的强大功能使得解决配置问题非常值得。 @Anthony 如果您可以让他的代码示例正常工作,那就太好了,但正如我所评论的那样,这没有任何意义。至于感觉“错误”,我不知道为什么将事物细分为逻辑部分似乎是“错误的”,但是您在原始帖子中没有提供足够的细节来说明如何最好地将其翻译为IObservable<T>
,因为没有关于给出了您当前如何使用该信息发出信号。
@casperOne 在您看来,主题的使用是否适合消息总线/事件聚合器?
@kitsune 不,我不明白他们为什么会这样做。如果您正在考虑“优化”,您必须问这是否是问题所在,您是否测量过 Rx 是问题的原因?
我同意 casperOne 的观点,即拆分关注点是个好主意。我想指出,如果你使用 Hardware to Event to Rx 模式,你会失去错误语义。任何丢失的连接或会话等都不会暴露给消费者。现在消费者无法决定是否要重试、断开连接、订阅另一个序列或其他什么。【参考方案5】:
概括地说主题不适合用于公共接口是不好的。 虽然这确实不是响应式编程方法应该有的样子,但它绝对是经典代码的一个很好的改进/重构选项。
如果您有一个带有公共 set 访问器的普通属性,并且您想通知更改,则没有什么反对用 BehaviorSubject 替换它的。 INPC 或其他其他事件并不是那么干净,它让我个人感到厌烦。 为此,您可以并且应该将 BehaviorSubjects 用作公共属性而不是普通属性,并放弃 INPC 或其他事件。
此外,Subject-interface 使您界面的用户更加了解您的属性的功能,并且更有可能订阅而不是仅仅获得价值。
如果您希望其他人收听/订阅属性的更改,最好使用它。
【讨论】:
以上是关于为啥在 .NET 响应式扩展中不推荐主题?的主要内容,如果未能解决你的问题,请参考以下文章