在 OnNext 运行时,我想删除所有传入的通知,除了最新的

Posted

技术标签:

【中文标题】在 OnNext 运行时,我想删除所有传入的通知,除了最新的【英文标题】:While the OnNext is running, I want to drop all incoming notifications except from the latest 【发布时间】:2021-11-26 19:26:23 【问题描述】:

我有一些可观察的,随着时间的推移会产生物品。项目之间的间隔几乎是随机的。我订阅了我的 observable 和 process 项目,每个进程都可能需要很多时间,当OnNext 结束其工作时,会产生许多新项目。我只需要拿最后一件物品并处理它。因此,每当OnNext 操作完成时,我都需要对上一次OnNext 运行期间生成的项目中的最新项目执行OnNext。 我可以用 Rx 做吗?

我尝试了Window(1).Switch(),但它似乎在项目到来时立即执行,而不是在OnNext 完成时执行。

【问题讨论】:

“我有一些观察者,随着时间的推移产生物品。” OnNext 调用,则调用此方法的 observable 也将被阻塞,并且在此期间它将无法发出任何其他 OnNext。根据RX contract,必须对OnNext 通知进行序列化。所以我不确定如何回答您的问题。 也许您正在寻找的是ExhaustMap 运算符。 是的,对不起,我的意思是可观察的,你说得对,已修复。 OnNext 被阻止,我知道,但 observable 仍然有新项目。而且我不需要处理所有这些,只需要处理 prevoius OnNext 完成时的最后一项。 如果我做对了,ExhaustMap 将忽略所有项目,但我需要最后一个项目。 查看这个问题:How to merge a nested observable IObservable<IObservable<T>> with limited concurrency and limited buffer capacity? 如果您重构代码以使用Merge 运算符,则可以将Merge 替换为在此找到的MergeBounded 实现之一问题,maximumConcurrency: 1boundedCapacity: 1 【参考方案1】:

这是一个相对简单的自定义DroppingDo 运算符,它可能满足您的需求。它与内置的Do 操作符有些相似,不同之处在于它在ThreadPool 而不是当前线程上调用操作,并且它忽略在前一个操作运行时接收到的项目。不过,最新的项目会被保留。

/// <summary>
/// Invokes an action sequentially for each element in the observable sequence,
/// on the specified scheduler, skipping and dropping elements that are received
/// during the execution of a previous action, except from the latest element.
/// </summary>
public static IObservable<TSource> DroppingDo<TSource>(
    this IObservable<TSource> source,
    Action<TSource> action,
    IScheduler scheduler = null)

    // Arguments validation omitted
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    
        Tuple<TSource> latest = null;
        return source
            .Select(item =>
            
                var previous = Interlocked.Exchange(ref latest, Tuple.Create(item));
                if (previous != null) return Observable.Empty<TSource>();
                return Observable.Defer(() =>
                
                    var current = Interlocked.Exchange(ref latest, null);
                    Debug.Assert(current != null);
                    var unboxed = current.Item1;
                    return Observable.Start(
                        () =>  action(unboxed); return unboxed; , scheduler);
                );
            )
            .Concat();
    );

使用示例。只需替换您可能看起来像这样的代码:

someObservable
    .Subscribe(x => Process(x), ex => HandleError(ex));

有了这个:

someObservable
    .DroppingDo(x => Process(x))
    .Subscribe(_ =>  , ex => HandleError(ex));

【讨论】:

注意,当前版本的 Rx 库 (5.0.0) 中的 Concat 运算符 behaves wierdly。我的建议是改用等效的Merge(1) 运算符,直到Concat 的问题得到解决。 1maxConcurrent 参数的值。将此参数设置为1 表示不并发。

以上是关于在 OnNext 运行时,我想删除所有传入的通知,除了最新的的主要内容,如果未能解决你的问题,请参考以下文章

当应用程序进入暂停状态时取消本地通知。(从后台删除)

从 Parse.com 查询或保存传入的推送通知

有没有办法在新通知到达时删除已发送的通知?

如何在接收 Firefox 推送通知时删除缓存

尝试根据标识符删除多个本地通知

将传入的whatsapp通知存储到本地数据库