类似 Replay() 的功能,但具有替换陈旧值的能力?

Posted

技术标签:

【中文标题】类似 Replay() 的功能,但具有替换陈旧值的能力?【英文标题】:Replay()-like functionality but with the ability to displace stale values? 【发布时间】:2015-08-21 10:29:52 【问题描述】:

想知道是否有人能想到这个用例的优雅解决方案:

我正在使用一个 observable(TEntity 类型的 IObservable),它为我提供了一个实体流。如果这些实体中的任何一个被更新,那么 observable 的提供者将下推更新的实体。

我在这个流上使用了 Replay(),这样我只需要订阅底层流一次,这样迟到的订阅者就可以看到所有的值。问题是这里可能存在内存泄漏,因为 Replay() 将保留它看到的所有更新,而我只需要每个实体的最新更新。

我可以用 Scan() 替换 Replay() ,它允许我只维护最新更新,但是我必须推出迄今为止观察到的所有更新的字典,而不仅仅是特定实体变了。

我能想到的唯一解决方案是使用上面的 Scan(),但在 Scan() 实现中,我会将所有更新推送到主题中。我将公开的 IObservable 的订阅者将收到存储在 Scan() 字典中的快照的合并以及任何更新,如下所示:

private Subject<Entity> _updateSubject = new Subject<Entity>();

private IObservable<Dictionary<string, Entity>> _subscriptionStream;

//called once on initialisation
private void CreateSubscription()

    IObservable<Entity> source = GetSomeLongRunningSubscriptionStream();

    _subscriptionStream = source
    .Scan(new Dictionary<string, Entity>(), (accumulator,update) =>
    
        accumulator[update.ID] = update;

        _updateSubject.OnNext(update);

        return accumulator;
    )
    .Replay(1);


//called each time a consumer wants access to the stream
public IObservable<Entity> GetStream()

    return _subscriptionStream.Take(1).SelectMany(x => x).Select(x => x.Value)
    .Merge(_updateSubject.AsObservable());

谁能想到一个更优雅的解决方案,将状态保存在单个流中,而不是诉诸主题?

谢谢

**************编辑**************

根据我的评论,我采用了类似的方法。让我知道你的想法

//called once on initialisation
private void CreateSubscription()
        
            _baseSubscriptionObservable = GetSomeLongRunningSubscriptionStream ().Publish();

            _snapshotObservable = _baseSubscriptionObservable
                .Scan(new Dictionary<string,Entity>(), (accumulator, update) =>
                    
                        accumulator[update.ID] = update;

                        return accumulator;
                    )
                .StartWith(new Dictionary<string, Entity>())
                .Replay(1);

            _baseSubscriptionObservable.Connect ();
            _snapshotObservable.Connect ();
        

public IObservable<Entity> GetStream()
        
            return _snapshotObservable.Take (1).Select (x => x.Values).SelectMany (x => x)
                .Merge (_baseSubscriptionObservable);
        

【问题讨论】:

想一想,也许我可以将 2 个已发布的流合并在一起。一个可以是上面发出 Scan/replay(1) 的 Snapshot,另一个可以是提供实时更新的 connect()。 【参考方案1】:

我通常喜欢你所做的,但我可以看到一些问题。

首先,您将CreateSubscriptionGetStream 拆分为两种方法,您的想法是您将拥有一个对GetSomeLongRunningSubscriptionStream() 流的底层订阅。不幸的是,在这种情况下,无论您获得多少订阅到最终的 observable,您都将拥有零订阅,因为 .Replay(1) 返回一个 IConnectableObservable&lt;&gt;,您需要调用 .Connect() 来开始值流。

接下来是用最新值更新累加器,然后在 GetStream 中添加最新值,同时合并到累加器的扁平流中。您每次都会返回两次最新值。

我建议你这样做:

private IObservable<IList<Timestamped<Entity>>> GetStream()

    return
        Observable
            .Create<IList<Timestamped<Entity>>>(o =>
                GetSomeLongRunningSubscriptionStream()
                    .Timestamp()
                    .Scan(
                        new Dictionary<string, Timestamped<Entity>>(),
                        (accumulator, update) =>
                        
                            accumulator[update.Value.ID] = update;
                            return accumulator;
                        )
                    .Select(x => x.Select(y => y.Value).ToList())
                    .Replay(1)
                    .RefCount()
                    .Subscribe(o));

在使用 Rx 时最好避免任何状态(不在可观察对象中本地化)。所以我将CreateSubscriptionGetStream 合并到一个GetStream 方法中,并将整个可观察对象封装到Observable.Create 中。

为了避免两次推送值并帮助您了解最新更新是什么,我添加了对 .Timestamp() 的调用,以输入返回 Entity 的最新时间。

我在字典中保留了.Scan(...),但现在它是Dictionary&lt;string, Timestamped&lt;Entity&gt;&gt;

对于每个添加/更新的值,我会展平字典并将基础值作为列表返回。此时,您可以对列表进行排序,以确保最新的值是第一个或最后一个以满足您的需要。

然后,我使用.Replay(1).RefCount() 组合将.Replay(1) 返回的IConnectableObservable&lt;&gt; 转换回IObservable&lt;&gt;,并理解当所有订阅者都处置时,您将处置基础订阅。这可能是您的查询中最关键的部分。应该这样做。这是确保避免内存泄漏的 Rx 方式。

如果您迫切需要保持底层连接打开,那么您需要将所有代码封装在一个实现 IDisposable 的类中,以清理您需要的 .Connect()

类似这样的:

public class EntityStream : IDisposable 

    private IDisposable _connection = null;

    public EntityStream(IObservable<Entity> someLongRunningSubscriptionStream)
    
        _stream =
            someLongRunningSubscriptionStream
            .Timestamp()
            .Scan(
                new Dictionary<string, Timestamped<Entity>>(),
                (accumulator, update) =>
                
                    accumulator[update.Value.ID] = update;
                    return accumulator;
                )
            .Select(x => x.Select(y => y.Value).ToList())
            .Replay(1);

        _connection = _stream.Connect();
    

    private IConnectableObservable<IList<Timestamped<Entity>>> _stream = null;

    public IObservable<IList<Timestamped<Entity>>> GetStream()
    
        return _stream.AsObservable();
    

    public void Dispose()
    
    if (_connection != null)
    
        _connection.Dispose();
        _connection = null;
    
    

我很少这样做。我会彻底推荐做第一种方法。您应该只在必要时混合 OOP 和 Rx。

如果您需要任何说明,请告诉我。

【讨论】:

感谢您的详细回复。首先,您正确地指出,无论订阅者数量如何,都会创建此订阅。幸运的是,在这种情况下,这个特定的订阅是整个应用程序的驱动程序,因此绝对需要在启动时创建它,并且它会在应用程序的整个生命周期内持续存在。此外,除非我遗漏了什么,否则值不会被推送两次,因为 Take(1) 可确保新订阅者仅在该时间点获得当前快照,并与更新合并。 对于您的解决方案,您似乎每次都在推出整个列表?就我而言,这并没有什么意义,因为对于每次更新,消费者只需要担心特定的更新,而不是他们已经观察到的整个快照。 +1 讨论如何处理这个问题,非常好的建议!我在示例中省略了这一点。 (尽管如上所述,订阅会在应用程序的整个生命周期内持续存在。因此在关闭时会被处理) 我已经用一个稍微干净的解决方案更新了我的问题,如果你有任何想法,我将不胜感激。谢谢 @Chris - 我有点错了 - 只有第一个产生的值是重复的。我测试了原始代码。我确实错过了Take(1) 的重要性。每次添加某些内容时,我都会将代码阅读为返回字典中的所有项目。您的代码还不错,但我会使用类似于我的第一个解决方案的方法,并在您的应用程序开始时创建一个空订阅 - 这将使所有未来的订阅者都可以观察到,这是一个很好的做法。

以上是关于类似 Replay() 的功能,但具有替换陈旧值的能力?的主要内容,如果未能解决你的问题,请参考以下文章

尝试构建一个表,但不断获取具有索引值的数组

fiddler 简单的接口性能测试replay

复制具有缺失值的行并使用向量替换缺失值

ClubHouse 上线支持Replay功能;WebOBS直播推流工具要流行起来了 |W

Chrome 的“Replay XHR”功能被移除了吗?

Pandas - 在保留原始列的同时创建具有替换值的新列