基于OkHttp和RxJava封装的Socket长连接开源库
Posted 郭霖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于OkHttp和RxJava封装的Socket长连接开源库相关的知识,希望对你有一定的参考价值。
大家早上好!长假将至,相信明天一定有很多朋友已经在回家的路上了,因此今天这篇文章是国庆前的最后一篇文章,从明天开始将不再发文,提前祝大家国庆中秋双节快乐!
本篇来自 huiandroid 的投稿,分享了一个基于 okhttp 和 RxJava 封装的自动重连的 WebSocket,希望大家喜欢!
http://blog.csdn.net/huiAndroid
RxWebSocket 是一个基于 okhttp 和 RxJava 封装的 WebSocket 客户端,此库的核心特点是 除了手动关闭 WebSocket(就是 RxJava取消订阅),WebSocket 在异常关闭的时候(onFailure,发生异常,如WebSocketException等等),会自动重连,永不断连.其次,对 WebSocket 做的缓存处理,同一个 URL,共享一个 WebSocket.
由于是基于 RxJava 封装,所以带来了无限可能,可以和 RxBinding,Rxlifecycle 一起使用,方便对 WebSocket 的管理.
效果图
重连
项目已经上传 Jcenter,依赖方法:
初始化(也可以忽略,直接使用)
如果你想使用自己的 okhttpClient:
是否打印日志:
获取一个 WebSocket,接收消息,多种方式:
发送消息
关闭 WebSocket
项目是依托 RxJava 实现的,所以关闭 WebSocket 的方法也就是在适当的时候注销 Observable,项目里的 demo 里,写了一个简单的lifecycle,将 Observable 生命绑定到 Activity 的 onDestroy,自动注销.代码细节请看 demo,因为内部实现了同一个 URL 的 WebSocket 共享机制,所以当外部所有持有这个 URL 的 Observable 都注销后,这个 WebSocket 连接就会自动关闭.请看原理解析部分.下面两种常用注销方法:
首先需要将 okhttp 的 WebSocket 包装成 Observable,由于需要将WebSocket,Stringmsg,ByteString 等信息一同发送给观察者所以先构建一个 WebSocketInfo 类,将信息封装:
onOpen 字段主要用来判断当前的这个 WebSocketInfo 是否是当 WebSocket 打开时发送的消息(onOpen),这时,Stringmsg 和 ByteString 都是 null.
将 WebSocketInfo 包装成 Observable 发出
实现一个 WebSocketOnSubscribe 将 WebSocket 的回调转化成 subscriber 调用.发送给Observable下游.在 onOpen 时调用 subscriber.onStart(),并且发送一个 onOpen 的WebSocketInfo.在 subscriber 注销的时候关闭 WebSocket.在 call 方法最上面有个 SystemClock.sleep(2000) ,这个主要是为了降低在断连的时候的重连频率,将在下面讲到.
包装成 Observable:
实现自动重连
RxJava retry 操作符,很完美的实现了这个功能,当上游发出 Throwable 的时候,retry 将错误吃掉,并重新调用 onSubscribe 的 call 方法,也就是 WebSocketOnSubscribe 的 call,就会重新初始化一个 WebSocket 连接,达到重连的目的,如果一直没有网络,这个 retry 的调用频率非常高,所以在 call 方法里面,当是重连的时候,就 SystemClock.sleep(2000),休眠2秒,这样重连的频率就是2秒重连一次. 当然在 retry 上面还有一个timeout操作符.当 subscriber.onNext()在指定时间间隔里没有调用,就发出一个 timeoutException ,让 retry 重连 WebSocket .这个主要是为了适配部分国产机型,当 WebSocket 发生连接异常时,不会及时发出错误,如小米平板.在每次重连都会把原来的 WebSocket 关闭.
实现同一个 URL 的 WebSocket 共享
实现共享功能,主要是为了防止一个 URL 的 WebSocket ,建立多个连接,这个主要是由RxJava 的 share 操作符实现,share操作符,使得一个 Observable 可以有多个 subscriber ,当有多个 subscriber 时,当所有的 subscriber 都取消订阅,这个 Observable 才会取消订阅. getWebSocketInfo() 方法完整代码:
doOnUnsubscribe 作用:在 Observable 注销,即 WebSocket 关闭时,移除 map 中的缓存的Observable 和 WebSocket.
doOnNext 作用: 判断接收到的 WebSocketInfo 是否是 WebSocket 在 onOpen 的时候发的,然后将其缓存起来.作用就是:如果有一个相同的 URL 订阅 Observable,就从缓存中取,这个时候我们应该把一个 WebSocket 的 onOpen 事件也发给这个订阅者:
这样的话,同一个 URL 的 WebSocket,不管在什么地方什么时间订阅,都能收到一个 onOpen 事件,外部表现的就像一个新的 WebSocket.
getWebSocketInfo 方法的几种变体:
send 信息到服务端
上面已经讲到 WebSocketInfo 包含了 WebSocket,所以在订阅后,就可以拿到这个 WebSocket 引用就可以 WebSocket.send 发送消息到服务端.当然我们的 RxWebSocketUtil 已经将开启的 WebSocket 已经缓存.所以我们也可以这样发消息:
当指定的 URL 的 WebSocket 没有打开会直接报错.
异步发送消息到服务端
这两种发送方式,你不用关心 URL 的 WebSocket 是否打开,可以直接发送.实现思路也很简单, getWebSocket(url) 会获取到 Observable ,或者是从缓存中取,或者是重新开启一个 WebSocket ,但你都不需要关心,经过 first 操作符后,如果是从缓存取的 Observable ,就注销的当前的 Observable,当是新开的 WebSocket ,注销掉当前的 subscriber 后,就没有其他 subscriber 了,这个新开的 WebSocket 就会关闭( share 操作符作用).
https://github.com/dhhAndroid/RxWebSocket
如果对你有帮助,谢谢 star !
以上是关于基于OkHttp和RxJava封装的Socket长连接开源库的主要内容,如果未能解决你的问题,请参考以下文章
基于Retrofit+RxJava 封装 Leopard 网络框架
RxJava2+Retrofit2+RxLifecycle3+OkHttp3网络请求封装(动态演示)
网络框架封装(retrofit2+rxjava2+okhttp3)