假冒由Observable.FromEvent生成的正在进行的项目

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了假冒由Observable.FromEvent生成的正在进行的项目相关的知识,希望对你有一定的参考价值。

我的目标是将来自IObservable<T>的所有项目/通知假脱机以供将来的订阅者使用。

例如。如果有人订阅消息流,首先他会收到订阅之前的所有消息。然后他开始接收新消息,无论什么时候都有。这应该无缝地发生,而不会在新旧消息之间的“边界”上重复和丢失。

我想出了以下扩展方法:

public static IObservable<T> WithHistory<T>(this IObservable<T> source)
{
    var accumulator = new BlockingCollection<T>();

    source.Subscribe(accumulator.Add);

    return accumulator
        .GetConsumingEnumerable()
        .ToObservable()
        .SubscribeOn(ThreadPoolScheduler.Instance);
}

据我测试,它的工作原理:

class Generator<T>
{
    event Action<T> onPush;

    public IObservable<T> Items =>
        Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);

    public void Push(T item) => onPush?.Invoke(item);
}

...

private static void Main()
{
    var g = new Generator<int>();
    var ongoingItems = g.Items;
    var allItems = g.Items.WithHistory();

    g.Push(1);
    g.Push(2);

    ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
    allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));

    g.Push(3);
    g.Push(4);
    g.Push(5);

    Console.ReadLine();
}

结果:

Ongoing: got 3
Ongoing: got 4
Ongoing: got 5
WithHistory: got 1
WithHistory: got 2
WithHistory: got 3
WithHistory: got 4
WithHistory: got 5

然而,使用BlockingCollection<T>似乎是一种矫枉过正。此外,上述方法不支持完成,错误处理,并且会在没有.SubscribeOn(ThreadPoolScheduler.Instance)的情况下导致死锁。

没有描述的缺陷,有没有更好的方法来实现它?

答案

最好的方法是使用.Replay()

void Main()
{
    var g = new Generator<int>();
    var ongoingItems = g.Items;
    var allItems = g.Items.Replay().RefCount();

    using(var tempSubscriber = allItems.Subscribe())
    {
        g.Push(1);
        g.Push(2);

        ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
        allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));

        g.Push(3);
        g.Push(4);
        g.Push(5);

        Console.ReadLine();
    }
}

.Replay().RefCount()产生一个可观察的,只要有一个用户,它将保留一个内部队列进行重放。如果你有一个持久的订阅者(就像你的解决方案在WithHistory方法中),你有内存泄漏。解决这个问题的最佳方法是让一个临时用户在你不再对历史感兴趣后自动断开连接。

以上是关于假冒由Observable.FromEvent生成的正在进行的项目的主要内容,如果未能解决你的问题,请参考以下文章

Angular2 RxJS 得到“Observable_1.Observable.fromEvent 不是函数”错误

TypeError: Observable_1.Observable.fromEvent 不是 ng2-bs3-modal/ng2-bs3-modal 模块中的函数

ESLint:fromEvent 已定义但从未使用过(no-unused-vars)

unsubscribe 不是 observable 上的函数

RxJS.Observable debounce 有啥作用?

rxjs来啦