卡在 Rx Observable SelectMany

Posted

技术标签:

【中文标题】卡在 Rx Observable SelectMany【英文标题】:Stuck with Rx Observable SelectMany 【发布时间】:2014-07-09 12:52:24 【问题描述】:

我的目标是通过 ftp 下载文件并以某种方式异步处理它们。 我将文件列表转换为 IObservable 并使用 SelectMany 组合器对其进行处理。里面有一些操作来下载被阻塞的文件:尝试下载重试次数的文件并返回 Tuple,或者在失败的情况下返回 Tuple 并将其包装到 Observable 中。 “延迟后可观察到的可重试”示例我采用了there 并对其进行了轻微修改。 问题是我的代码在下载几个文件后随机停止。有时它会在“订阅”方法中到达“OnNext”回调。我从未检测到代码到达“OnComplete”回调。也没有抛出异常。

        files.ToObservable().SelectMany(f =>
        
            var source = Observable.Defer(() => Observable.Start(() =>
            
                ftpConnection.DownloadFile(avroPath, f.Name);
                return Tuple.Create(true, f.Name);
            ));
            int attempt = 0;
            return Observable.Defer(() => ((++attempt == 1)
                ? source
                : source.DelaySubscription(TimeSpan.FromSeconds(1))))
                .Retry(4)
                .Catch(Observable.Return(Tuple.Create(false, f.Name)));
        ).Subscribe(
            res =>
            
                Console.Write("Damn, its only rarely gets there, however some files were downloaded succesfully");
                if (res.Item1) Process(res.Item2);
                else LogOrQueueOrWhatever(res.Item2);
            ,
            (Exception ex) =>
            
                Console.Write("Never was thrown");
            ,
            () =>
            
                Console.Write("Never entered this section");
                ProcessLogs();
                ScheduleNExtDownloadRoutine();
            );

如果有人能用更惯用的方式来处理 Observables 上的组合器,我将不胜感激。

【问题讨论】:

在您给它足够的时间异步下载所有文件之前,您的应用程序是否已经退出(Subscribe 不会阻塞当前线程)? 正是如此。发消息后才意识到,所以我是新手。 仅供参考,请参阅 Retry 运算符的此问题:***.com/questions/24655590/… 【参考方案1】:

正如 Brandon 提到的,在定义 observable 的行为后没有同步/阻塞。所以我通过用“ForEachAsync”替换“Subscribe”调用来处理它,将 Observable 转换为 Task 并用 Tasks 的“Wait”方法阻止调用者:

    files.ToObservable().SelectMany(f =>
    
        var source = Observable.Defer(() => Observable.Start(() =>
        
            ftpConnection.DownloadFile(avroPath, f.Name);
            return Tuple.Create(true, f.Name);
        ));
        int attempt = 0;
        return Observable.Defer(() => ((++attempt == 1)
            ? source
            : source.DelaySubscription(TimeSpan.FromSeconds(1))))
            .Retry(4)
            .Catch(Observable.Return(Tuple.Create(false, f.Name)));
    ).ForEachAsync(res =>
    
        if (res.Item1) Process(res.Item2);
        else LogOrQueueOrWhatever(res.Item2);
    ).Wait();

    ProcessLogs();
    ScheduleNExtDownloadRoutine();

【讨论】:

以上是关于卡在 Rx Observable SelectMany的主要内容,如果未能解决你的问题,请参考以下文章

如何声明 Rx.Observable.fromPromise() 的 TypeScript 返回类型

rx 的 Observable.FromEventPattern 的 TPL 等价物是啥?

Rx.Observable subscribe 和 forEach 有啥区别

rx.js Observable 剖析(创建,订阅,执行,清理 )

Spring Boot 2 - 将 Mono 转换为 rx.Observable?

入门Rx-Observable的创建方式