反应式框架(RX)和异步处理事件

Posted

技术标签:

【中文标题】反应式框架(RX)和异步处理事件【英文标题】:Reactive Framework (RX) and dealing with events Asynchronously 【发布时间】:2010-07-09 17:21:12 【问题描述】:

所以我只是在玩 RX 并学习它。我开始玩事件,想知道如何订阅事件,并异步处理结果。请允许我用代码解释一下:

引发事件的简单类:

public class EventRaisingClass

   public event EventHandler<SomeEventArgs> EventOccured;

   //some other code that raises event...


public class SomeEventArgs : EventArgs

    public SomeEventArgs(int data)
    
        this.SomeArg = data;
    

    public int SomeArg  get; private set; 

然后是我的主:

public static void Main(string[] args)

    var eventRaiser = new EventRaisingClass();
    IObservable<IEvent<SomeEventArgs>> observable = 
        Observable.FromEvent<SomeEventArgs>(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e);

    IObservable<IList<IEvent<SomeEventArgs>>> bufferedEvents = observable.BufferWithCount(100);

    //how can I subscribte to bufferedEvents so that the subscription code gets called Async?
    bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously...


正如您在我的 cmets 中看到的,当您像这样调用 subscribe 时,所有订阅代码都会同步发生。有没有一种开箱即用的方法使用 RX 让订阅在有新的一批事件需要处理时在不同的线程上被调用?

【问题讨论】:

【参考方案1】:
bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(...

SubscribeOn 是指定所谓的“订阅副作用”发生的时间表。例如,您的 observable 可以在每次有人订阅时打开一个文件。

ObserveOn 是指定每次有新值时调用观察者的时间表。在实践中,它比 SubscribeOn 更常用。

【讨论】:

【参考方案2】:

我相信您正在寻找SubscribeOnObserveOn,传递ISchedulerSystem.Concurrency下内置了几个调度器;其中一些使用当前的任何线程,而另一些使用特定的线程。

This video 有更多关于调度器概念的信息。

Rx 团队最近还发布了一个hands-on labs 文档,这是目前最接近教程的内容。

【讨论】:

以上是关于反应式框架(RX)和异步处理事件的主要内容,如果未能解决你的问题,请参考以下文章

RxJava中的概念

Android学习笔记 十六 使用RxBinding响应控件的异步事件

反应式编程 RxJava 设计原理解析

多线程实现简单的事件异步处理框架

Android异步框架RxJava 1.x系列 - 事件及事件序列转换原理

何时使用RX