带序列超时提示的Observale

Posted henreash

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了带序列超时提示的Observale相关的知识,希望对你有一定的参考价值。

Observable.Timeout扩展提供了序列超时的实现,但不够灵活,超时后就异常导致订阅失效。自定义的Observable更加灵活。 

IDisposable customerTimeoutSubscriber;

        private void button10_Click(object sender, EventArgs e)
       
            var cnt = 0;
            var observable = Observable.Create<Tuple<int, bool, string>>(ob =>
           
                var timeoutEvent = new ManualResetEvent(false);
                Task.Factory.StartNew(() =>
               
                    while (true)
                   
                        var isOk = timeoutEvent.WaitOne(2500);
                        if (cancelSource.Token.IsCancellationRequested)
                            break;
                        if (!isOk)
                       
                            ob.OnNext(new Tuple<int, bool, string>(cnt, false, "序列超时"));
                            //Interlocked.Increment(ref cnt);
                       
                        timeoutEvent.Reset();
                   
                );
                while (true)
               
                    if (cancelSource.Token.IsCancellationRequested)
                        break;
                    ob.OnNext(new Tuple<int, bool, string>(cnt, false, DateTime.Now.ToString()));
                    Interlocked.Increment(ref cnt);
                    timeoutEvent.Set();
                    //超时策略
                    var r = new Random().Next(1, 4);
                    Debug.WriteLine($"cnt sleep rs");
                    Thread.Sleep(r * 1000);
               
                ob.OnCompleted();
                return Disposable.Empty;
            );
            //observable = Observable.Timeout(observable, new TimeSpan(0, 0, 2));
            customerTimeoutSubscriber = observable.ObserveOn(new DispatcherScheduler(Dispatcher.CurrentDispatcher)).SubscribeOn(NewThreadScheduler.Default).Subscribe((v) =>
           
                Debug.WriteLine($"v.Item1 v.Item3");
            ,
            ex =>
           
                Debug.WriteLine(ex.Message);
                //cancelSource.Cancel();
            ,
            () => Debug.WriteLine("complate"); );
       

以上是关于带序列超时提示的Observale的主要内容,如果未能解决你的问题,请参考以下文章

rxjs系列 -- Observale与Observer

一文带你搞定TCP重传

C# 多线程,ThreadStart()里面的方法带了参数就提示错误?

Mybatis 批量插入带oracle序列例子+ORA-02287: 此处不允许序号

Flink 自定义触发器实现带超时时间的 countAndTimeTrigger

重构file_get_contents实现一个带超时链接访问的函数