在订阅回调中结束订阅

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在订阅回调中结束订阅相关的知识,希望对你有一定的参考价值。

码:

            subscription = source
                // close websocket eventually
                .Finally(() => webSocket.CloseAsync(WebSocketCloseStatus.Empty, String.Empty, CancellationToken.None).Wait())
                .Subscribe(
                    data =>
                    {
                        if (webSocket.State != WebSocketState.Open)
                        {
                            _logger.LogWarning("Websocket closed by client!");
                            // TODO: End subscription
                        }

                        try
                        {
                            webSocket.SendAsync(data.ToString(),
                                WebSocketMessageType.Text, true, CancellationToken.None).Wait();
                        }
                        catch (WebSocketException e)
                        {
                            _logger.LogWarning(e, "problem with websocket!");
                            // TODO: End subscription
                        }
                    });

说明:

  • 聆听“来源”中的事件
  • 收到数据 - >发送websocket
  • 在异常或websocket closed =>应该结束对“source”的订阅并确保websocket已关闭(如果尚未关闭)。怎么样??

实现这一目标的一种方法是让异常不被处理,这将使观察者停止观察并“最终”调用,但问题是它然后崩溃我的整个服务器,因为异常在某个后台线程上重新抛出并且未处理。

答案

要在回调中结束订阅,您只需在订阅上调用Dispose()

 IDisposable subscription = null;

 ...


 subscription = source
                // close websocket eventually
                .Finally(() => webSocket.CloseAsync(WebSocketCloseStatus.Empty, String.Empty, CancellationToken.None).Wait())
                .Subscribe(
                    data =>
                    {
                        if (webSocket.State != WebSocketState.Open)
                        {
                            _logger.LogWarning("Websocket closed by client!");
                            subscription.Dispose(); // End subscription
                        }

                        try
                        {
                            webSocket.SendAsync(data.ToString(),
                                WebSocketMessageType.Text, true, CancellationToken.None).Wait();
                        }
                        catch (WebSocketException e)
                        {
                            _logger.LogWarning(e, "problem with websocket!");
                            subscription.Dispose(); // End subscription
                        }
                    });

另见http://www.introtorx.com/content/v1.0.10621.0/03_LifetimeManagement.html#Unsubscribing

另一答案

你真的应该尝试像这样定义你的observable:

var subscription =
    source
        .TakeWhile(data => webSocket.State == WebSocketState.Open)
        .SelectMany(data =>
            Observable.FromAsync(() =>
                webSocket.SendAsync(data.ToString(), WebSocketMessageType.Text, true, CancellationToken.None)))
        .Catch<WebSocketException>(ex =>
            Observable.FromAsync(() =>
                webSocket.CloseAsync(WebSocketCloseStatus.Empty, String.Empty, CancellationToken.None)))
        .Subscribe();

它最终会自动取消订阅。

即使你可以,你也应该避免在观察者中取消订阅。

以上是关于在订阅回调中结束订阅的主要内容,如果未能解决你的问题,请参考以下文章

如何检查组件是不是已卸载到功能组件中?

javascript UV Index Monitor App订阅PubNub并显示UV索引值。博文的代码片段。在这里查看项目:https:// githu

如何取消订阅RxKotlin / RxJava中的Flowable?

c_cpp UV Index Indicator订阅PubNub并使用颜色显示UV索引值。博文的代码片段。在这里查看项目:https:/

Android 应用内结算订阅状态更改回调

订阅的节会触发回调吗?