创建和订阅简单的观察队列

Posted Ant-double

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了创建和订阅简单的观察队列相关的知识,希望对你有一定的参考价值。

你没有必要自己手动实现Iobservable<T>接口来创建观察队列,同样的,你也没有必要实现Iobserver<T>接口来订阅这个队列。通过安装RX库,RX提供请多静态的方法来创建带有一个参数或多个或没有参数的简单队列。你可以很方便的使用这些静态方法。另外,RX还提供了订阅扩展方法来实现多种多样的OnNext,OnError,OnCompleted句柄委托。
创建和订阅简单的观察队列
下面的例子使用观察者类型的范围操作来创建简单的观察集合,通过Observable类订阅方法来订阅这个观察队列集合,并且提供要处理OnNext,OnError,OnCompleted事件的委托Action.
这个范围操作有很多重载的版本,在我们的例子中,创建一个整数队列从x开始并并生y个数。
一旦开始订阅观察者,值就会被发送给订阅者,那么OnNext的委拖就会被执行。

技术分享
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
View Code

 

当一个观察者订阅另一个观察队列时,线程调用订阅的方法可能来源与不同的线程,直到这些线程队列执行完成。因此,在观察队列执行完成前订阅调用是异步的不会阻塞其它线程。我们会在后面讲计划任务里详细讲到。

注意订阅的方法返回的是Idisposable,所以你可以解除订阅关系并且释放资源非常容易。当你调用观察队列的Dispose的方法时,这时观察队列就会停止监听数据。一般情况下你除非要提前结束订阅否则你没有必要手动调用Dispose这个方法,当观察源的寿命比观察队列更长时,RX的设计可以去失去相关的关系情节并不用使用终结器。当IDispose被GC回收时,RX不会自动的释放订阅关系,然而,我们应该注意到当观察者操作释放订阅关系时,订阅关系会被立即释放。如当OnCompeleted或者OnError消息发生时还有可能是var x=Observable.Zip(a,b).Subscribe(),x订阅了a,b如果a异常了那么a,b与x的订阅关系都 会立即被释放。

你可以调整上面的代码通过使用Observable类型的Create方法来返回一个observe,p 在这个方法中定义好OnNext,OnError,OnCompleted的委托,之后你可以通过observer来订阅到观察者类型上代码如下:

 

技术分享
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IObserver<int> obsvr = Observer.Create<int>(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
IDisposable subscription = source.Subscribe(obsvr);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}
View Code

 

除了自己创建observable队列你可以转换现存的.net事件和异步到观察队列中。其它的主题中会详细说到。
使用一个计时器
接下来我们使用一个计时器来创建一个队列,这个队列将在5秒后开推送数据,之后每1秒推送一次,为了说明问题,我们将在每个操作推出值 的加上时间戳,通过这样,当我们订阅这个数据源队列时我们可以收到它的值 和时间戳。

技术分享
Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
View Code

 

把普通的队列转换为观察者队列

技术分享
IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.ReadKey();
View Code

 

冷与热观察者
冷观察者在开始订阅后,观察者只会像订阅的对象发送流,订阅之间是不能共享值 的。
热观察者是可以共享订阅的值 的。每一个订阅者都 会得到推送的值

技术分享
IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1)); 
IDisposable subscription1 = source.Subscribe(
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
IDisposable subscription2 = source.Subscribe(
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();
View Code

 


热的代码如下:

技术分享
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequence
IConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequence
IDisposable subscription1 = hot.Subscribe( // no value is pushed to 1st subscription at this point
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
hot.Connect(); // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000); //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe( // value will immediately be pushed to 2nd subscription
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();
View Code

 

源文

 










以上是关于创建和订阅简单的观察队列的主要内容,如果未能解决你的问题,请参考以下文章

利用Redis作消息队列,实现生产消费和发布订阅

设计模式 行为型模式 -- 观察者模式(发布-订阅(Publish/Subscribe)模式)

观察者模式 vs 发布-订阅模式

从一道面试题简单谈谈发布订阅和观察者模式

带有可观察对象的任务队列[关闭]

JS 设计模式八 -- 发布订阅者模式