类似 Replay() 的功能,但具有替换陈旧值的能力?
Posted
技术标签:
【中文标题】类似 Replay() 的功能,但具有替换陈旧值的能力?【英文标题】:Replay()-like functionality but with the ability to displace stale values? 【发布时间】:2015-08-21 10:29:52 【问题描述】:想知道是否有人能想到这个用例的优雅解决方案:
我正在使用一个 observable(TEntity 类型的 IObservable),它为我提供了一个实体流。如果这些实体中的任何一个被更新,那么 observable 的提供者将下推更新的实体。
我在这个流上使用了 Replay(),这样我只需要订阅底层流一次,这样迟到的订阅者就可以看到所有的值。问题是这里可能存在内存泄漏,因为 Replay() 将保留它看到的所有更新,而我只需要每个实体的最新更新。
我可以用 Scan() 替换 Replay() ,它允许我只维护最新更新,但是我必须推出迄今为止观察到的所有更新的字典,而不仅仅是特定实体变了。
我能想到的唯一解决方案是使用上面的 Scan(),但在 Scan() 实现中,我会将所有更新推送到主题中。我将公开的 IObservable 的订阅者将收到存储在 Scan() 字典中的快照的合并以及任何更新,如下所示:
private Subject<Entity> _updateSubject = new Subject<Entity>();
private IObservable<Dictionary<string, Entity>> _subscriptionStream;
//called once on initialisation
private void CreateSubscription()
IObservable<Entity> source = GetSomeLongRunningSubscriptionStream();
_subscriptionStream = source
.Scan(new Dictionary<string, Entity>(), (accumulator,update) =>
accumulator[update.ID] = update;
_updateSubject.OnNext(update);
return accumulator;
)
.Replay(1);
//called each time a consumer wants access to the stream
public IObservable<Entity> GetStream()
return _subscriptionStream.Take(1).SelectMany(x => x).Select(x => x.Value)
.Merge(_updateSubject.AsObservable());
谁能想到一个更优雅的解决方案,将状态保存在单个流中,而不是诉诸主题?
谢谢
**************编辑**************
根据我的评论,我采用了类似的方法。让我知道你的想法
//called once on initialisation
private void CreateSubscription()
_baseSubscriptionObservable = GetSomeLongRunningSubscriptionStream ().Publish();
_snapshotObservable = _baseSubscriptionObservable
.Scan(new Dictionary<string,Entity>(), (accumulator, update) =>
accumulator[update.ID] = update;
return accumulator;
)
.StartWith(new Dictionary<string, Entity>())
.Replay(1);
_baseSubscriptionObservable.Connect ();
_snapshotObservable.Connect ();
public IObservable<Entity> GetStream()
return _snapshotObservable.Take (1).Select (x => x.Values).SelectMany (x => x)
.Merge (_baseSubscriptionObservable);
【问题讨论】:
想一想,也许我可以将 2 个已发布的流合并在一起。一个可以是上面发出 Scan/replay(1) 的 Snapshot,另一个可以是提供实时更新的 connect()。 【参考方案1】:我通常喜欢你所做的,但我可以看到一些问题。
首先,您将CreateSubscription
和GetStream
拆分为两种方法,您的想法是您将拥有一个对GetSomeLongRunningSubscriptionStream()
流的底层订阅。不幸的是,在这种情况下,无论您获得多少订阅到最终的 observable,您都将拥有零订阅,因为 .Replay(1)
返回一个 IConnectableObservable<>
,您需要调用 .Connect()
来开始值流。
接下来是用最新值更新累加器,然后在 GetStream
中添加最新值,同时合并到累加器的扁平流中。您每次都会返回两次最新值。
我建议你这样做:
private IObservable<IList<Timestamped<Entity>>> GetStream()
return
Observable
.Create<IList<Timestamped<Entity>>>(o =>
GetSomeLongRunningSubscriptionStream()
.Timestamp()
.Scan(
new Dictionary<string, Timestamped<Entity>>(),
(accumulator, update) =>
accumulator[update.Value.ID] = update;
return accumulator;
)
.Select(x => x.Select(y => y.Value).ToList())
.Replay(1)
.RefCount()
.Subscribe(o));
在使用 Rx 时最好避免任何状态(不在可观察对象中本地化)。所以我将CreateSubscription
和GetStream
合并到一个GetStream
方法中,并将整个可观察对象封装到Observable.Create
中。
为了避免两次推送值并帮助您了解最新更新是什么,我添加了对 .Timestamp()
的调用,以输入返回 Entity
的最新时间。
我在字典中保留了.Scan(...)
,但现在它是Dictionary<string, Timestamped<Entity>>
。
对于每个添加/更新的值,我会展平字典并将基础值作为列表返回。此时,您可以对列表进行排序,以确保最新的值是第一个或最后一个以满足您的需要。
然后,我使用.Replay(1).RefCount()
组合将.Replay(1)
返回的IConnectableObservable<>
转换回IObservable<>
,并理解当所有订阅者都处置时,您将处置基础订阅。这可能是您的查询中最关键的部分。应该这样做。这是确保避免内存泄漏的 Rx 方式。
如果您迫切需要保持底层连接打开,那么您需要将所有代码封装在一个实现 IDisposable
的类中,以清理您需要的 .Connect()
。
类似这样的:
public class EntityStream : IDisposable
private IDisposable _connection = null;
public EntityStream(IObservable<Entity> someLongRunningSubscriptionStream)
_stream =
someLongRunningSubscriptionStream
.Timestamp()
.Scan(
new Dictionary<string, Timestamped<Entity>>(),
(accumulator, update) =>
accumulator[update.Value.ID] = update;
return accumulator;
)
.Select(x => x.Select(y => y.Value).ToList())
.Replay(1);
_connection = _stream.Connect();
private IConnectableObservable<IList<Timestamped<Entity>>> _stream = null;
public IObservable<IList<Timestamped<Entity>>> GetStream()
return _stream.AsObservable();
public void Dispose()
if (_connection != null)
_connection.Dispose();
_connection = null;
我很少这样做。我会彻底推荐做第一种方法。您应该只在必要时混合 OOP 和 Rx。
如果您需要任何说明,请告诉我。
【讨论】:
感谢您的详细回复。首先,您正确地指出,无论订阅者数量如何,都会创建此订阅。幸运的是,在这种情况下,这个特定的订阅是整个应用程序的驱动程序,因此绝对需要在启动时创建它,并且它会在应用程序的整个生命周期内持续存在。此外,除非我遗漏了什么,否则值不会被推送两次,因为 Take(1) 可确保新订阅者仅在该时间点获得当前快照,并与更新合并。 对于您的解决方案,您似乎每次都在推出整个列表?就我而言,这并没有什么意义,因为对于每次更新,消费者只需要担心特定的更新,而不是他们已经观察到的整个快照。 +1 讨论如何处理这个问题,非常好的建议!我在示例中省略了这一点。 (尽管如上所述,订阅会在应用程序的整个生命周期内持续存在。因此在关闭时会被处理) 我已经用一个稍微干净的解决方案更新了我的问题,如果你有任何想法,我将不胜感激。谢谢 @Chris - 我有点错了 - 只有第一个产生的值是重复的。我测试了原始代码。我确实错过了Take(1)
的重要性。每次添加某些内容时,我都会将代码阅读为返回字典中的所有项目。您的代码还不错,但我会使用类似于我的第一个解决方案的方法,并在您的应用程序开始时创建一个空订阅 - 这将使所有未来的订阅者都可以观察到,这是一个很好的做法。以上是关于类似 Replay() 的功能,但具有替换陈旧值的能力?的主要内容,如果未能解决你的问题,请参考以下文章