在 OnNext 运行时,我想删除所有传入的通知,除了最新的
Posted
技术标签:
【中文标题】在 OnNext 运行时,我想删除所有传入的通知,除了最新的【英文标题】:While the OnNext is running, I want to drop all incoming notifications except from the latest 【发布时间】:2021-11-26 19:26:23 【问题描述】:我有一些可观察的,随着时间的推移会产生物品。项目之间的间隔几乎是随机的。我订阅了我的 observable 和 process 项目,每个进程都可能需要很多时间,当OnNext
结束其工作时,会产生许多新项目。我只需要拿最后一件物品并处理它。因此,每当OnNext
操作完成时,我都需要对上一次OnNext
运行期间生成的项目中的最新项目执行OnNext
。
我可以用 Rx 做吗?
我尝试了Window(1).Switch()
,但它似乎在项目到来时立即执行,而不是在OnNext
完成时执行。
【问题讨论】:
“我有一些观察者,随着时间的推移产生物品。” OnNext 调用,则调用此方法的 observable 也将被阻塞,并且在此期间它将无法发出任何其他OnNext
。根据RX contract,必须对OnNext
通知进行序列化。所以我不确定如何回答您的问题。
也许您正在寻找的是ExhaustMap
运算符。
是的,对不起,我的意思是可观察的,你说得对,已修复。 OnNext 被阻止,我知道,但 observable 仍然有新项目。而且我不需要处理所有这些,只需要处理 prevoius OnNext 完成时的最后一项。
如果我做对了,ExhaustMap 将忽略所有项目,但我需要最后一个项目。
查看这个问题:How to merge a nested observable IObservable<IObservable<T>> with limited concurrency and limited buffer capacity? 如果您重构代码以使用Merge
运算符,则可以将Merge
替换为在此找到的MergeBounded
实现之一问题,maximumConcurrency: 1
和 boundedCapacity: 1
。
【参考方案1】:
这是一个相对简单的自定义DroppingDo
运算符,它可能满足您的需求。它与内置的Do
操作符有些相似,不同之处在于它在ThreadPool
而不是当前线程上调用操作,并且它忽略在前一个操作运行时接收到的项目。不过,最新的项目会被保留。
/// <summary>
/// Invokes an action sequentially for each element in the observable sequence,
/// on the specified scheduler, skipping and dropping elements that are received
/// during the execution of a previous action, except from the latest element.
/// </summary>
public static IObservable<TSource> DroppingDo<TSource>(
this IObservable<TSource> source,
Action<TSource> action,
IScheduler scheduler = null)
// Arguments validation omitted
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
Tuple<TSource> latest = null;
return source
.Select(item =>
var previous = Interlocked.Exchange(ref latest, Tuple.Create(item));
if (previous != null) return Observable.Empty<TSource>();
return Observable.Defer(() =>
var current = Interlocked.Exchange(ref latest, null);
Debug.Assert(current != null);
var unboxed = current.Item1;
return Observable.Start(
() => action(unboxed); return unboxed; , scheduler);
);
)
.Concat();
);
使用示例。只需替换您可能看起来像这样的代码:
someObservable
.Subscribe(x => Process(x), ex => HandleError(ex));
有了这个:
someObservable
.DroppingDo(x => Process(x))
.Subscribe(_ => , ex => HandleError(ex));
【讨论】:
注意,当前版本的 Rx 库 (5.0.0) 中的Concat
运算符 behaves wierdly。我的建议是改用等效的Merge(1)
运算符,直到Concat
的问题得到解决。 1
是maxConcurrent
参数的值。将此参数设置为1
表示不并发。以上是关于在 OnNext 运行时,我想删除所有传入的通知,除了最新的的主要内容,如果未能解决你的问题,请参考以下文章