Websocket实现即时通讯

Posted BarryWang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Websocket实现即时通讯相关的知识,希望对你有一定的参考价值。

前言

关于我和WebSocket的缘:我从大二在计算机网络课上听老师讲过之后,第一次使用就到了毕业之后的第一份工作。直到最近换了工作,到了一家是含有IM社交聊天功能的app的时候,我觉得我现在可以谈谈我对WebSocket/Socket的一些看法了。要想做IM聊天app,就不得不理解WebSocket和Socket的原理了,听我一一道来。

目录

1.WebSocket使用场景

2.WebSocket诞生由来

3.谈谈WebSocket协议原理

4.WebSocket 和 Socket的区别与联系

5.ios平台有哪些WebSocket和Socket的开源框架

6.iOS平台如何实现WebSocket协议

一.WebSocket的使用场景

1.社交聊天

最著名的就是微信,QQ,这一类社交聊天的app。这一类聊天app的特点是低延迟,高即时。即时是这里面要求最高的,如果有一个紧急的事情,通过IM软件通知你,假设网络环境良好的情况下,这条message还无法立即送达到你的客户端上,紧急的事情都结束了,你才收到消息,那么这个软件肯定是失败的。

2.弹幕

说到这里,大家一定里面想到了A站和B站了。确实,他们的弹幕一直是一种特色。而且弹幕对于一个视频来说,很可能弹幕才是精华。发弹幕需要实时显示,也需要和聊天一样,需要即时。

3.多玩家游戏

4.协同编辑

现在很多开源项目都是分散在世界各地的开发者一起协同开发,此时就会用到版本控制系统,比如Git,SVN去合并冲突。但是如果有一份文档,支持多人实时在线协同编辑,那么此时就会用到比如WebSocket了,它可以保证各个编辑者都在编辑同一个文档,此时不需要用到Git,SVN这些版本控制,因为在协同编辑界面就会实时看到对方编辑了什么,谁在修改哪些段落和文字。

5.股票基金实时报价

金融界瞬息万变——几乎是每毫秒都在变化。如果采用的网络架构无法满足实时性,那么就会给客户带来巨大的损失。几毫秒钱股票开始大跌,几秒以后才刷新数据,一秒钟的时间内,很可能用户就已经损失巨大财产了。

6.体育实况更新

全世界的球迷,体育爱好者特别多,当然大家在关心自己喜欢的体育活动的时候,比赛实时的赛况是他们最最关心的事情。这类新闻中最好的体验就是利用Websocket达到实时的更新!

7.视频会议/聊天

视频会议并不能代替和真人相见,但是他能让分布在全球天涯海角的人聚在电脑前一起开会。既能节省大家聚在一起路上花费的时间,讨论聚会地点的纠结,还能随时随地,只要有网络就可以开会。

8.基于位置的应用

越来越多的开发者借用移动设备的GPS功能来实现他们基于位置的网络应用。如果你一直记录用户的位置(比如运行应用来记录运动轨迹),你可以收集到更加细致化的数据。

9.在线教育

在线教育近几年也发展迅速。优点很多,免去了场地的限制,能让名师的资源合理的分配给全国各地想要学习知识的同学手上,Websocket是个不错的选择,可以视频聊天、即时聊天以及其与别人合作一起在网上讨论问题...

10.智能家居

这也是我一毕业加入的一个伟大的物联网智能家居的公司。考虑到家里的智能设备的状态必须需要实时的展现在手机app客户端上,毫无疑问选择了Websocket。

11.总结

从上面我列举的这些场景来看,一个共同点就是,高实时性!

二.WebSocket诞生由来

1.最开始的轮询Polling阶段

技术分享

这种方式下,是不适合获取实时信息的,客户端和服务器之间会一直进行连接,每隔一段时间就询问一次。客户端会轮询,有没有新消息。这种方式连接数会很多,一个接受,一个发送。而且每次发送请求都会有Http的Header,会很耗流量,也会消耗CPU的利用率。

2.改进版的长轮询Long polling阶段

技术分享

长轮询是对轮询的改进版,客户端发送HTTP给服务器之后,有没有新消息,如果没有新消息,就一直等待。当有新消息的时候,才会返回给客户端。在某种程度上减小了网络带宽和CPU利用率等问题。但是这种方式还是有一种弊端:例如假设服务器端的数据更新速度很快,服务器在传送一个数据包给客户端后必须等待客户端的下一个Get请求到来,才能传递第二个更新的数据包给客户端,那么这样的话,客户端显示实时数据最快的时间为2×RTT(往返时间),而且如果在网络拥塞的情况下,这个时间用户是不能接受的,比如在股市的的报价上。另外,由于http数据包的头部数据量往往很大(通常有400多个字节),但是真正被服务器需要的数据却很少(有时只有10个字节左右),这样的数据包在网络上周期性的传输,难免对网络带宽是一种浪费。

3.WebSocket诞生

现在急需的需求是能支持客户端和服务器端的双向通信,而且协议的头部又没有HTTP的Header那么大,于是,Websocket就诞生了!

技术分享

上图就是Websocket和Polling的区别,从图中可以看到Polling里面客户端发送了好多Request,而下图,只有一个Upgrade,非常简洁高效。至于消耗方面的比较就要看下图了

技术分享

上图中,我们先看蓝色的柱状图,是Polling轮询消耗的流量,

Use case A: 1,000 clients polling every second: Network throughput is (871 x 1,000) = 871,000 bytes = 6,968,000 bits per second (6.6 Mbps)

Use case B: 10,000 clients polling every second: Network throughput is (871 x 10,000) = 8,710,000 bytes = 69,680,000 bits per second (66 Mbps)

Use case C: 100,000 clients polling every 1 second: Network throughput is (871 x 100,000) = 87,100,000 bytes = 696,800,000 bits per second (665 Mbps)

而Websocket的Frame是 just two bytes of overhead instead of 871,仅仅用2个字节就代替了轮询的871字节!

Use case A: 1,000 clients receive 1 message per second: Network throughput is (2 x 1,000) = 2,000 bytes = 16,000 bits per second (0.015 Mbps)

Use case B: 10,000 clients receive 1 message per second: Network throughput is (2 x 10,000) = 20,000 bytes = 160,000 bits per second (0.153 Mbps)

Use case C: 100,000 clients receive 1 message per second: Network throughput is (2 x 100,000) = 200,000 bytes = 1,600,000 bits per second (1.526 Mbps)

相同的每秒客户端轮询的次数,当次数高达10W/s的高频率次数的时候,Polling轮询需要消耗665Mbps,而Websocket仅仅只花费了1.526Mbps,将近435倍!!

三.谈谈WebSocket协议原理

Websocket是应用层第七层上的一个应用层协议,它必须依赖 HTTP 协议进行一次握手 ,握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。

Websocket的数据传输是frame形式传输的,比如会将一条消息分为几个frame,按照先后顺序传输出去。这样做会有几个好处:

1)大数据的传输可以分片传输,不用考虑到数据大小导致的长度标志位不足够的情况。

2)和http的chunk一样,可以边生成数据边传递消息,即提高传输效率。

技术分享

四.WebSocket 和 Socket的区别与联系

首先,Socket 其实并不是一个协议。它工作在 OSI 模型会话层(第5层),是为了方便大家直接使用更底层协议(一般是 TCP 或 UDP )而存在的一个抽象层。Socket是对TCP/IP协议的封装,Socket本身并不是协议,而是一个调用接口(API)。

技术分享

Socket通常也称作”套接字”,用于描述IP地址和端口,是一个通信链的句柄。网络上的两个程序通过一个双向的通讯连接实现数据的交换,这个双向链路的一端称为一个Socket,一个Socket由一个IP地址和一个端口号唯一确定。应用程序通常通过”套接字”向网络发出请求或者应答网络请求。

Socket在通讯过程中,服务端监听某个端口是否有连接请求,客户端向服务端发送连接请求,服务端收到连接请求向客户端发出接收消息,这样一个连接就建立起来了。客户端和服务端也都可以相互发送消息与对方进行通讯,直到双方连接断开。

所以基于WebSocket和基于Socket都可以开发出IM社交聊天类的app

五.iOS平台有哪些WebSocket和Socket的开源框架

Socket开源框架有:CocoaAsyncSocketsocketio/socket.io-client-swift

WebSocket开源框架有:facebook/SocketRockettidwall/SwiftWebSocket

六.iOS平台如何实现WebSocket协议

Talk is cheap。Show me the code ——Linus Torvalds

我们今天来看看facebook/SocketRocket的实现方法

首先这是SRWebSocket定义的一些成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@property (nonatomic, weak) id  delegate;
/**
 A dispatch queue for scheduling the delegate calls. The queue doesn‘t need be a serial queue.
 If `nil` and `delegateOperationQueue` is `nil`, the socket uses main queue for performing all delegate method calls.
 */
@property (nonatomic, strong) dispatch_queue_t delegateDispatchQueue;
/**
 An operation queue for scheduling the delegate calls.
 If `nil` and `delegateOperationQueue` is `nil`, the socket uses main queue for performing all delegate method calls.
 */
@property (nonatomic, strong) NSOperationQueue *delegateOperationQueue;
@property (nonatomic, readonly) SRReadyState readyState;
@property (nonatomic, readonly, retain) NSURL *url;
@property (nonatomic, readonly) CFHTTPMessageRef receivedHTTPHeaders;
// Optional array of cookies (NSHTTPCookie objects) to apply to the connections
@property (nonatomic, copy) NSArray *requestCookies;
// This returns the negotiated protocol.
// It will be nil until after the handshake completes.
@property (nonatomic, readonly, copy) NSString *protocol;

下面这些是SRWebSocket的一些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Protocols should be an array of strings that turn into Sec-WebSocket-Protocol.
- (instancetype)initWithURLRequest:(NSURLRequest *)request;
- (instancetype)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray *)protocols;
- (instancetype)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray *)protocols allowsUntrustedSSLCertificates:(BOOL)allowsUntrustedSSLCertificates;
// Some helper constructors.
- (instancetype)initWithURL:(NSURL *)url;
- (instancetype)initWithURL:(NSURL *)url protocols:(NSArray *)protocols;
- (instancetype)initWithURL:(NSURL *)url protocols:(NSArray *)protocols allowsUntrustedSSLCertificates:(BOOL)allowsUntrustedSSLCertificates;
// By default, it will schedule itself on +[NSRunLoop SR_networkRunLoop] using defaultModes.
- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
// SRWebSockets are intended for one-time-use only.  Open should be called once and only once.
- (void)open;
- (void)close;
- (void)closeWithCode:(NSInteger)code reason:(NSString *)reason;
///--------------------------------------
#pragma mark Send
///--------------------------------------
//下面是4个发送的方法
/**
 Send a UTF-8 string or binary data to the server.
 @param message UTF-8 String or Data to send.
 @deprecated Please use `sendString:` or `sendData` instead.
 */
- (void)send:(id)message __attribute__((deprecated("Please use `sendString:` or `sendData` instead.")));
- (void)sendString:(NSString *)string;
- (void)sendData:(NSData *)data;
- (void)sendPing:(NSData *)data;
@end

对应5种状态的代理方法

1
2
3
4
5
6
7
8
9
10
11
12
///--------------------------------------
#pragma mark - SRWebSocketDelegate
///--------------------------------------
@protocol SRWebSocketDelegate - (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message;
@optional
- (void)webSocketDidOpen:(SRWebSocket *)webSocket;
- (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error;
- (void)webSocket:(SRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean;
- (void)webSocket:(SRWebSocket *)webSocket didReceivePong:(NSData *)pongPayload;
// Return YES to convert messages sent as Text to an NSString. Return NO to skip NSData -> NSString conversion for Text messages. Defaults to YES.
- (BOOL)webSocketShouldConvertTextFrameToString:(SRWebSocket *)webSocket;
@end

didReceiveMessage方法是必须实现的,用来接收消息的。

下面4个did方法分别对应着Open,Fail,Close,ReceivePong不同状态的代理方法

方法就上面这些了,我们实际来看看代码怎么写

先是初始化Websocket连接,注意此处ws://或者wss://连接有且最多只能有一个,这个是Websocket协议规定的

1
2
3
4
self.ws = [[SRWebSocket alloc] initWithURLRequest:[NSURLRequest requestWithURL:[NSURL URLWithString:[NSString 
stringWithFormat:@"%@://%@:%zd/ws", serverProto, serverIP, serverPort]]]];
    self.ws.delegate = delegate;
    [self.ws open];

发送消息

1
[self.ws send:message];

接收消息以及其他3个代理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//这个就是接受消息的代理方法了,这里接受服务器返回的数据,方法里面就应该写处理数据,存储数据的方法了。
- (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message
{
    NSDictionary *data = [NetworkUtils decodeData:message];
    if (!data)
        return;
}
//这里是Websocket刚刚Open之后的代理方法。就想微信刚刚连接中,会显示连接中,当连接上了,就不显示连接中了,取消显示连接的方法就应该写在这里面
- (void)webSocketDidOpen:(SRWebSocket *)webSocket
{
    // Open = silent ping
    [self.ws receivedPing];
}
//这是关闭Websocket的代理方法
- (void)webSocket:(SRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean
{
    [self failedConnection:NSLS(Disconnected)];
}
//这里是连接Websocket失败的方法,这里面一般都会写重连的方法
- (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error
{
    [self failedConnection:NSLS(Disconnected)];
}

最后

以上就是我想分享的一些关于Websocket的心得,文中如果有错误的地方,欢迎大家指点!一般没有微信QQ那么大用户量的app,用Websocket应该都可以完成IM社交聊天的任务。当用户达到亿级别,应该还有很多需要优化,优化性能各种的吧。

最后,微信和QQ的实现方法也许并不是只用Websocket和Socket这么简单,也许是他们自己开发的一套能支持这么大用户,大数据的,各方面也都优化都最优的方法。如果有开发和微信和QQ的大神看到这篇文章,可以留言说说看你们用什么方式实现的,也可以和我们一起分享,我们一起学习!我先谢谢大神们的指点了!

 

1,Android 客户端使用需要配置网络权限; 

2,需要写一个自己的client类来继承WebsocketClient;实现websocket的状态回调和新消息的解析动作;

3,需要监控自己的client的链接状态,维持长链接;

4,发送和接收

 

下面贴出部分相关代码;

网络权限的不用说了吧!

client类:

常用状态回调方法有4个;

自己可以在对应的函数里面做响应的处理,比如当链接发生错误时要重新去打开该链接,收到消息时即时的保存聊天记录和发送系统通知来提醒用户查看新消息等等;

 

[html] view plain copy
 
  1. <pre name="code" class="html"><span style="font-size:18px;">public class TestClient extends WebSocketClient {  
  2.   
  3.     public TestClient(URI serverURI) {  
  4.         super(serverURI);  
  5.     }  
  6.     /***  
  7.      * 链接关闭  
  8.      */  
  9.     @Override  
  10.     public void onClose(int arg0, String arg1, boolean arg2) {  
  11.   
  12.     }  
  13.     /***  
  14.      * 链接发生错误  
  15.      */  
  16.     @Override  
  17.     public void onError(Exception arg0) {  
  18.   
  19.     }  
  20.     /**  
  21.      * 新消息  
  22.      */  
  23.     @Override  
  24.     public void onMessage(String arg0) {  
  25.   
  26.     }  
  27.     /***  
  28.      * 链接打开  
  29.      */  
  30.     @Override  
  31.     public void onOpen(ServerHandshake arg0) {  
  32.         // TODO Auto-generated method stub  
  33.   
  34.     }  
  35. }  
  36. </span>  



 


下面是我聊天部分代码,有离线消息/PC同步/多对多的聊天;

 

仅供参考;

 

[java] view plain copy
 
  1. /*** 
  2.  * <h2>WebSocket android 客户端</h2> 
  3.  * <ol> 
  4.  * <li>Socket链接打开回调 {@link WebSocket#onOpen(ServerHandshake)},此处有 
  5.  * {@link SocketConstant#ON_OPEN} 广播发出; 
  6.  * <li>Socket链接出现异常错误时回调 {@link WebSocket#onError(Exception)},此处有 
  7.  * {@link SocketConstant#ON_ERROR}广播发出; 
  8.  * <li>Socket链接关闭回调 {@link WebSocket #onClose(int, String, boolean)},此处有 
  9.  * {@link SocketConstant#ON_CLOSES}广播发出; 
  10.  * <li>Socket链接接收消息回调 {@link WebSocket#onMessage(String)} 
  11.  * ,此处做收消息的逻辑的处理;包括发送消息服务器返回的发送结果,PC端同步接收的新消息,及外来的新消息; 
  12.  * <li>检测是否有消息遗漏 {@link WebSocket#checkMsgWebId(String, String)},参数为联系人和webId; 
  13.  * <li>取得正在桌面运行的activity的名称 {@link WebSocket#getRunningActivityName()} 
  14.  * <li>接收到的消息的处理 {@link WebSocket#messageHandle(MessageEntity, String)} 
  15.  * ,参数为消息实体和消息类型 (buyer/server) 
  16.  * <li>发送新消息的系统通知 {@link WebSocket#sendNotification(String)},参数为联系人; 
  17.  * <li>保存离线消息 {@link WebSocket#saveOffLineMsg(HashMap)},参数为接收到的离线消息集合; 
  18.  * <li>保存从服务端获取的联系人的webId {@link WebSocket#saveContactsWebID(HashMap)} 
  19.  * ,参数为以联系人为key以最大webId为值得map集合; 
  20.  * </ol> 
  21.  *  
  22.  * @author li‘mingqi <a> 2014-3-19</a> 
  23.  *  
  24.  */  
  25. public class WebSocket extends WebSocketClient {  
  26.     // 登陆返回的back_type字段  
  27.     public static final String LOGIN_RETURN_TYPE = "login";  
  28.     // 发送信息的back_type字段  
  29.     public static final String SEND_RETURN_TYPE = "send_result";  
  30.     // 接收信息的back_type字段  
  31.     public static final String RECEIVER_RETURN_TYPE = "msg";  
  32.     // 接收客服的信息的back_type字段  
  33.     public static final String GET_SERVER_RETURN_TYPE = "server_info";  
  34.     // 接收服务端返回对应联系人的最大顺序ID  
  35.     public static final String CONTACTS_MAX_WEBID_TYPE = "max_id_return";  
  36.     // 接收用户的离线消息  
  37.     public static final String USER_OFFLINE_MSG_TYPE = "offline";  
  38.     // 上下文对象  
  39.     private Context mContext;  
  40.     // socket返回json解析类对象  
  41.     private WebSocketParser mParser;  
  42.     // 系统通知管理  
  43.     public NotificationManager mNotificationManager;  
  44.     // 系统通知  
  45.     private Notification mNoti;  
  46.     // 意图  
  47.     private PendingIntent mIntent;  
  48.     // 该系统通知的 id  
  49.     public static final int NOTIFICATION_ID = 100;  
  50.   
  51.     @SuppressWarnings("deprecation")  
  52.     public SGWebSocket(Context context, URI serverUri, Draft draft) {  
  53.         super(serverUri, draft);  
  54.         this.mContext = context;  
  55.                 //新消息的解析类  
  56.                 this.mParser = WebSocketParser.getInstance();  
  57.                 //收到新消息发送的通知  
  58.                 this.mNotificationManager = (NotificationManager) this.mContext  
  59.                 .getSystemService(Context.NOTIFICATION_SERVICE);  
  60.         this.mNoti = new Notification(R.drawable.system_info, "您有新消息!",  
  61.                 System.currentTimeMillis());  
  62.     }  
  63.   
  64.     /*** 
  65.      * send broadcast <SGSocketConstant>ON_CLOSES filter if this socket closed 
  66.      * socket 发生关闭时发送的广播,若想提示,可以接受并处理 
  67.      *  
  68.      */  
  69.     @Override  
  70.     public void onClose(int arg0, String arg1, boolean arg2) {  
  71.         // 更改保存的链接状态  
  72.         UserInfoUtil.saveSocket(mContext, false);  
  73.         mNotificationManager.cancelAll();  
  74.         Intent intent = new Intent(SocketConstant.ON_CLOSES);  
  75.         intent.putExtra(SocketConstant.ON_CLOSES, arg1.toString());  
  76.         mContext.sendBroadcast(intent);  
  77.     }  
  78.   
  79.     /*** 
  80.      * send broadcast <SGSocketConstant>ON_ERROR filter if this socket has error 
  81.      * socket 发生错误发送的广播,若想提示,可以接受并处理 
  82.      *  
  83.      */  
  84.     @Override  
  85.     public void onError(Exception arg0) {  
  86.         Intent intent = new Intent(SGSocketConstant.ON_ERROR);  
  87.         intent.putExtra(SGSocketConstant.ON_ERROR, arg0.toString());  
  88.         mContext.sendBroadcast(intent);  
  89.         this.close();  
  90.     }  
  91.   
  92.     // 买家  
  93.     public static final String MSG_BUYER_TYPE = "1";  
  94.     // 客服  
  95.     public static final String MSG_SERVCER_TYPE = "2";  
  96.   
  97.     // 游客  
  98.     // public static final String MSG_RANDOM_TYPE = "3";  
  99.   
  100.     /*** 
  101.      * receiver message from server 1,登陆返回 type 
  102.      * <WebSocket>LOGIN_RETURN_TYPE; 2,发送返回 type 
  103.      * <WebSocket>SEND_RETURN_TYPE; 3,接收信息返回 type 
  104.      * <WebSocket>RECEIVER_RETURN_TYPE; 
  105.      *  
  106.      * @throws InterruptedException 
  107.      */  
  108.     @Override  
  109.     public void onMessage(String content) {  
  110.         // parser  
  111.         try {  
  112.             JSONObject object = new JSONObject(content);  
  113.             Log.i("json", "卖家--" + object.toString());  
  114.             String back_type = object.getString("back_type");  
  115.             String activity = getRunningActivityName();  
  116.             if (SEND_RETURN_TYPE.equals(back_type)) {// 发送具体消息时返回发送结果  
  117.                 // json解析  
  118.                 MessageEntity entity = mParser.sendMessageParser(mContext,  
  119.                         content);  
  120.                 if ("true".equals(entity.getSend_state())) {// 发送成功  
  121.                     // 判断是否是PC端发送的消息,若是PC端发送的消息,则在Android端做同步存储处理  
  122.                     // 1,首先判断数据库中是否包含该条信息  
  123.                     boolean has = MessageDB.getInstance(mContext)  
  124.                             .findMessageByMsgId(entity.get_id());  
  125.                     if (has) {  
  126.                         // Android端发送  
  127.                         MessageDB.getInstance(mContext).update(entity.get_id(),  
  128.                                 true, entity.getReceiverTime(),  
  129.                                 entity.getWebId());// 更新发送状态为已发送  
  130.                     } else {  
  131.                         // PC端发送,将该消息同步到Android端数据库  
  132.                         entity.setSend_state(SocketConstant.MSG_SEND_SUCCESS_STATE);  
  133.                         MessageDB.getInstance(mContext).insert(entity,  
  134.                                 SocketConstant.MSG_TYPE_BUYER);// 卖家发送给买家的  
  135.                         // 通知聊天主页面,更新聊天列表  
  136.                         pcSynAndroid(activity);  
  137.                     }  
  138.                     // 检测是否有消息遗漏  
  139.                     checkMsgWebId(entity.getContacts(), entity.getWebId());  
  140.                     Log.i("miss", "发送返回或者PC同步--" + entity.getContacts() + "--"  
  141.                             + entity.getWebId());  
  142.                 } else if ("false".equals(entity.getSend_state())) {// 发送失败  
  143.                     MessageDB.getInstance(mContext).update(entity.get_id(),  
  144.                             false, entity.getReceiverTime(), entity.getWebId());  
  145.                     Toast.makeText(mContext, entity.getErrorText(),  
  146.                             Toast.LENGTH_SHORT).show();  
  147.                 }  
  148.                 // 登陆返回 记录session  
  149.             } else if (LOGIN_RETURN_TYPE.equals(back_type)) {  
  150.                 KApplication.session = object.getString("session_id");  
  151.                 String str = object.getString("login_status");  
  152.                 if ("true".equals(str)) {  
  153.                     UserInfoUtil.saveSocket(mContext, true);  
  154.                     // 生成json请求字符串  
  155.                     String maxIdstring = SocketJsonUtil  
  156.                             .getContactsCurrentWebId(UserInfoUtil  
  157.                                     .getUser(mContext)[0], "2", MessageDB  
  158.                                     .getInstance(mContext)  
  159.                                     .findAllContactsAndType());  
  160.                     // 登陆成功,向服务器索取联系人的最大webId  
  161.                     send(maxIdstring);  
  162.                     Log.i("send", maxIdstring);  
  163.                 } else if ("false".equals(str)) {  
  164.                     UserInfoUtil.saveSocket(mContext, false);  
  165.                 }  
  166.   
  167.             } else if (RECEIVER_RETURN_TYPE.equals(back_type)) {// 接收到的具体聊天的信息  
  168.                 // json解析  
  169.                 MessageEntity entity = mParser.receiverMessagePrser(mContext,  
  170.                         content);  
  171.                 // 判断数据库中是否有该条消息,有则不处理,无则处理消息;  
  172.                 if (!MessageDB.getInstance(mContext).findMessageByMsgId(  
  173.                         entity.get_id())) {  
  174.                     // 消息处理  
  175.                     if (MSG_BUYER_TYPE.equals(entity.getSenderType())) {  
  176.                         // 买家  
  177.                         messageHandle(entity, SocketConstant.MSG_TYPE_BUYER);  
  178.                     } else if (MSG_SERVCER_TYPE.equals(entity.getSenderType())) {  
  179.                         // 卖家,客服  
  180.                         messageHandle(entity, SocketConstant.MSG_TYPE_SERVER);  
  181.                     }  
  182.                     Log.i("miss", "没有该条消息");  
  183.                     // 检测是否有消息遗漏  
  184.                     checkMsgWebId(entity.getContacts(), entity.getWebId());  
  185.                 }  
  186.             } else if (GET_SERVER_RETURN_TYPE.equals(back_type)) {// 获取闪聊客服返回的数据  
  187.                 // 客服  
  188.                 ServerEntity entity = mParser.serverInfoParser(content);// 客服对象  
  189.                 Intent intent = new Intent(SocketConstant.GET_SERVER_INFO);  
  190.                 intent.putExtra("server_info", entity);  
  191.                 mContext.sendBroadcast(intent);  
  192.             } else if (CONTACTS_MAX_WEBID_TYPE.equals(back_type)) {  
  193.                 // 返回的联系人最大的消息id  
  194.                 HashMap<String, String> map = mParser.contactsMaxWebId(content);  
  195.                 // 将联系人和其最大webId存入临时集合  
  196.                 saveContactsWebID(map);  
  197.                 // 开始请求服务器,释放离线消息给客户端;  
  198.                 send(SocketJsonUtil.getOffLine(  
  199.                         UserInfoUtil.getUser(mContext)[0], "2"));  
  200.                 Log.i("send",  
  201.                         SocketJsonUtil.getOffLine(  
  202.                                 UserInfoUtil.getUser(mContext)[0], "2"));  
  203.             } else if (USER_OFFLINE_MSG_TYPE.equals(back_type)) {  
  204.                 // 用户的离线消息  
  205.                 HashMap<String, ArrayList<MessageEntity>> map = mParser  
  206.                         .offLineMsg(mContext, content);  
  207.                 // 将离线消息入库  
  208.                 saveOffLineMsg(map);  
  209.             }  
  210.         } catch (JSONException e) {  
  211.             this.close();  
  212.         }  
  213.     }  
  214.   
  215.     /*** 
  216.      * send broadcast <SocketConstant>ON_OPEN filter if this socket opened 
  217.      * socket 打开时发送的广播,若想提示,可以接受并处理 
  218.      *  
  219.      */  
  220.     @Override  
  221.     public void onOpen(ServerHandshake arg0) {  
  222.         Intent intent = new Intent(SGSocketConstant.ON_OPEN);  
  223.         mContext.sendBroadcast(intent);  
  224.     }  
  225.   
  226.     /*** 
  227.      * 检测正在运行tasktop的activity 
  228.      * @return current running activity name 
  229.      *  
  230.      */  
  231.     private String getRunningActivityName() {  
  232.         ActivityManager activityManager = (ActivityManager) mContext  
  233.                 .getSystemService(Context.ACTIVITY_SERVICE);  
  234.         String runningActivity = activityManager.getRunningTasks(1).get(0).topActivity  
  235.                 .getClassName();  
  236.         return runningActivity;  
  237.     }  
  238.   
  239.     /*** 
  240.      * send notification for this contacts 
  241.      * 发送通知 
  242.      * @param contacts 
  243.      *  
  244.      */  
  245.     @SuppressWarnings("deprecation")  
  246.     private void sendNotification(String contacts) {  
  247.         Intent intent = new Intent(mContext, MainActivity.class);  
  248.         mIntent = PendingIntent.getActivity(mContext, 100, intent, 0);  
  249.         mNoti.flags = Notification.FLAG_AUTO_CANCEL;  
  250.         mNoti.defaults = Notification.DEFAULT_VIBRATE;  
  251.         mNoti.setLatestEventInfo(mContext, "标题", "您有新消息!", mIntent);  
  252.         mNoti.contentView = new RemoteViews(mContext.getApplicationContext()  
  253.                 .getPackageName(), R.layout.notification_item);  
  254.         mNoti.contentView.setTextViewText(R.id.noti_message, "收到来自" + contacts  
  255.                 + "的新消息");  
  256.         mNotificationManager.notify(NOTIFICATION_ID, mNoti);  
  257.     }  
  258.   
  259.     /*** 
  260.      * 具体聊天收到的外来消息处理 
  261.      *  
  262.      * @param entity 
  263.      *            消息实体 
  264.      * @param messageType 
  265.      *            消息类型(买家/客服) 
  266.      */  
  267.     private void messageHandle(MessageEntity entity, String messageType) {  
  268.         String activity = getRunningActivityName();  
  269.         // 处于聊天的页面  
  270.         if ("com.ui.activity.ManageChartActivity".equals(activity)) {  
  271.             // 处于正在聊天对象的页面,将数据写入数据库,并发送广播更新页面数据  
  272.             if (KApplication.crurentContacts.equals(entity.getContacts())) {  
  273.                 /** 
  274.                  * 接收到的消息,消息实体entity的send_state字段状态设置为 
  275.                  * MSG_SEND_SUCCESS_STATE(即201) 
  276.                  **/  
  277.                 entity.setSend_state(SocketConstant.MSG_SEND_SUCCESS_STATE);// 收到的信息,设置信息的状态  
  278.                 entity.setRead(SocketConstant.READ_STATE);  
  279.                 MessageDB.getInstance(mContext).insert(entity, messageType);// 将数据写入数据库,  
  280.                 Intent intent = new Intent(SocketConstant.NEW_MESSAGE);  
  281.                 intent.putExtra("newmsg", entity);  
  282.                 mContext.sendBroadcast(intent);  
  283.                 // 没有处于闪聊对象的页面,将数据写入数据库,发送系统通知  
  284.             } else {  
  285.                 entity.setSend_state(SocketConstant.MSG_SEND_SUCCESS_STATE);// 收到的信息,设置信息的状态  
  286.                 entity.setRead(SocketConstant.DEFAULT_READ_STATE);  
  287.                 MessageDB.getInstance(mContext).insert(entity, messageType);  
  288.                 if (KApplication.sp.getBoolean(RefreshUtils.noteFlag, false)) {  
  289.                     sendNotification(entity.getContacts());  
  290.                 }  
  291.                 Intent intent = new Intent(  
  292.                         SocketConstant.RECEIVER_NEW_MESSAGE);  
  293.                 mContext.sendBroadcast(intent);  
  294.             }  
  295.             // 将数据写入数据库,发送系统通知  
  296.         } else {  
  297.             entity.setSend_state(SocketConstant.MSG_SEND_SUCCESS_STATE);// 收到的信息,设置信息的状态  
  298.             entity.setRead(SocketConstant.DEFAULT_READ_STATE);  
  299.             MessageDB.getInstance(mContext).insert(entity, messageType);  
  300.             Intent intent = new Intent();  
  301.             if ("com.ui.activity.ManageConversationActivity"  
  302.                     .equals(activity)  
  303.                     || "com.ui.activity.MainActivity"  
  304.                             .equals(activity)) {  
  305.                 intent.setAction(SocketConstant.RECEIVER_NEW_MESSAGE);// 聊天页面  
  306.             } else {  
  307.                 intent.setAction(SocketConstant.RECEIVER_NEW_MESSAGE_OTHER);// 其他页面  
  308.             }  
  309.             mContext.sendBroadcast(intent);  
  310.             if (KApplication.sp.getBoolean(RefreshUtils.noteFlag, false)) {  
  311.                 sendNotification(entity.getContacts());  
  312.             }  
  313.         }  
  314.           
  315.     }  
  316.   
  317.     /*** 
  318.      * 电脑与手机同步信息 
  319.      *  
  320.      * @param currentActivity 
  321.      */  
  322.     public void pcSynAndroid(String currentActivity) {  
  323.         if ("com.iflashseller.ui.activity.ManageChartActivity"  
  324.                 .equals(currentActivity)) {  
  325.             // 正好与该联系人对话的页面  
  326.             Intent intent = new Intent(SocketConstant.CHART_ACTIVITY);  
  327.             mContext.sendBroadcast(intent);  
  328.         } else {  
  329.             // 其他页面  
  330.             Intent intent = new Intent(SocketConstant.GROUPS_ACTIVITY);  
  331.             mContext.sendBroadcast(intent);  
  332.         }  
  333.     }  
  334.   
  335.     /*** 
  336.      * 检测是否有消息遗漏 
  337.      *  
  338.      * @param contacts 
  339.      *            联系人 
  340.      * @param webId 
  341.      *            服务端给出的消息Id 
  342.      */  
  343.     public void checkMsgWebId(String contacts, int webId) {  
  344.         // 集合中含有该联系人  
  345.         if (KApplication.webIds.containsKey(contacts)) {  
  346.             Log.i("miss", "保存的--" + KApplication.webIds.get(contacts));  
  347.             // 临时集合中保存的webId  
  348.             int c = KApplication.webIds.get(contacts);  
  349.             /*** 
  350.              * 如果新收到的消息的webId大于临时集合中保存的改联系人的webId,且他们之间的差值大于1, 
  351.              * 则请求服务器推送疑似丢失的webId对应的消息 
  352.              */  
  353.             if (webId > c && (webId - 1) != c) {  
  354.                 // id不连续  
  355.                 for (int i = c + 1; i < webId; i++) {  
  356.                     // 向服务器发送请求,获取遗漏的消息  
  357.                     String miss = SocketJsonUtil.getMissMsg(  
  358.                             UserInfoUtil.getUser(mContext)[0], "2", contacts,  
  359.                             "1", i + "");  
  360.                     this.send(miss);  
  361.                     Log.i("miss", miss);  
  362.                 }  
  363.                 /*** 
  364.                  * 如果他们之间的差值正好为1,则修改临时集合的改联系人的webId, 
  365.                  */  
  366.             } else if (webId > c && (webId - 1) == c) {  
  367.                 KApplication.webIds.put(contacts, webId);  
  368.                 Log.i("miss", "修改的--" + contacts + "--" + webId);  
  369.             }  
  370.             /**** 
  371.              * 临时集合中没有改联系人的信息,则将该联系人的webId存入临时集合. 
  372.              */  
  373.         } else {  
  374.             KApplication.webIds.put(contacts, webId);  
  375.             Log.i("miss", "新增--" + contacts + "--" + webId);  
  376.         }  
  377.     }  
  378.   
  379.     /*** 
  380.      * 将从服务端获取的联系人的webId存入临时集合 
  381.      *  
  382.      * @param map 
  383.      */  
  384.     public void saveContactsWebID(HashMap<String, String> map) {  
  385.         Iterator<Entry<String, String>> iter = map.entrySet().iterator();  
  386.         while (iter.hasNext()) {  
  387.             Entry<String, String> es = iter.next();  
  388.             String contacts = es.getKey();  
  389.             String maxWebID = es.getValue();  
  390.             KApplication.webIds.put(contacts, Integer.parseInt(maxWebID));  
  391.         }  
  392.     }  
  393.   
  394.     /*** 
  395.      * 将离线消息入库 
  396.      *  
  397.      * @param map 
  398.      */  
  399.     public void saveOffLineMsg(HashMap<String, ArrayList<MessageEntity>> map) {  
  400.         Iterator<Entry<String, ArrayList<MessageEntity>>> iter = map.entrySet()  
  401.                 .iterator();  
  402.         while (iter.hasNext()) {  
  403.             ArrayList<MessageEntity> msgs = iter.next().getValue();  
  404.             for (int i = 0; i < msgs.size(); i++) {  
  405.                 threadSleep(100);  
  406.                 MessageDB.getInstance(mContext).insert(msgs.get(i),  
  407.                         SocketConstant.MSG_TYPE_BUYER);  
  408.                 Log.i("write", "离线数据入库---" + msgs.get(i).toString());  
  409.             }  
  410.             /*** 
  411.              * 如果服务端一次释放的离线消息大于等于10条,则继续请求释放离线消息. 
  412.              */  
  413.             if (msgs.size() >= 10) {  
  414.                 send(SocketJsonUtil.getOffLine(  
  415.                         UserInfoUtil.getUser(mContext)[0], "2"));  
  416.                 Log.i("send",  
  417.                         SocketJsonUtil.getOffLine(  
  418.                                 UserInfoUtil.getUser(mContext)[0], "2"));  
  419.             }  
  420.         }  
  421.         // 一轮消息入库结束,发送通知,更新UI;  
  422.         mContext.sendBroadcast(new Intent(  
  423.                 SocketConstant.OFFLINE_MSG_RECEIVER_SUCCESS));  
  424.     }  
  425.   
  426.     private void threadSleep(long time) {  
  427.         try {  
  428.             Thread.currentThread();  
  429.             Thread.sleep(time);  
  430.         } catch (InterruptedException e) {  
  431.             e.printStackTrace();  
  432.         }  
  433.     }  
  434. }  

至于数据库的一部分代码就不贴出了,无非是增删改查。

 

 

 

下面贴出部分监控链接状态的代码,以保证能即时的收到消息;

 

[java] view plain copy
 
  1. public class LApplication extends Application {  
  2.   
  3.     public static String TAG = LApplication.class.getSimpleName();  
  4.     /** 接收消息广播 **/  
  5.     private LPullReceiver mPullReceiver;  
  6.     /** 是否是正在登录 **/  
  7.     public static boolean isLoging = false;  
  8.     /** socket管理类 **/  
  9.     private LPushManager mWebSocket;  
  10.       
  11.       
  12.   
  13.     @Override  
  14.     public void onCreate() {  
  15.         super.onCreate();  
  16.         /*** 
  17.          * 注册接收消息的广播 
  18.          */  
  19.         mPullReceiver = new LPullReceiver();  
  20.         // 广播过滤  
  21.         IntentFilter filter = new IntentFilter();  
  22.         // 时钟信息发生变化  
  23.         filter.addAction(Intent.ACTION_TIME_TICK);  
  24.         // 开机广播  
  25.         filter.addAction(Intent.ACTION_BOOT_COMPLETED);  
  26.         // 网络状态发生变化  
  27.         filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION);  
  28.         // 屏幕打开  
  29.         filter.addAction(Intent.ACTION_SCREEN_ON);  
  30.         // 注册广播  
  31.         registerReceiver(mPullReceiver, filter);  
  32.         // 实例化socket管理类  
  33.         mWebSocket = new LPushManager(getApplicationContext());  
  34.         // 应用重启一次,默认socket为关闭状态  
  35.         LPushUser.saveSocket(getApplicationContext(), false);  
  36.         // 默认链接没有被拒绝  
  37.         LPushUser.saveConnect(getApplicationContext(), false);  
  38.         // 1,获取当前时间  
  39.         long currentTime = System.currentTimeMillis();  
  40.         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
  41.         Date date = new Date(currentTime);  
  42.         String time = sdf.format(date);  
  43.         // 修改标记的时间,保证5分钟内链接一次  
  44.         LPushUser.saveOpenTime(getApplicationContext(), time);  
  45.     }  
  46.   
  47.     /** 
  48.      * 广播接口类 
  49.      * <ol> 
  50.      * <li>接收时钟发生变化的广播 
  51.      * <li>接收网络发生变化的广播 
  52.      * <li>接收开机发送广播 
  53.      * <li>接收用户登录后发送的广播 
  54.      * </ol> 
  55.      *  
  56.      * @author li‘mingqi 
  57.      *  
  58.      */  
  59.     class LPullReceiver extends BroadcastReceiver {  
  60.         @SuppressLint("SimpleDateFormat")  
  61.         @Override  
  62.         public void onReceive(Context context, Intent intent) {  
  63.             String action = intent.getAction();  
  64.             if (Intent.ACTION_TIME_TICK.equals(action)  
  65.                     || ConnectivityManager.CONNECTIVITY_ACTION.equals(action)  
  66.                     || Intent.ACTION_BOOT_COMPLETED.equals(action)  
  67.                     || Intent.ACTION_SCREEN_ON.equals(action)) {  
  68.   
  69.                 if (LPushUser.GETLOG(getApplicationContext()))  
  70.                     Log.i("lpush",  
  71.                             "socket的链接状态----"  
  72.                                     + LPushUser  
  73.                                             .getSocket(getApplicationContext())  
  74.                                     + "是否正在链接--" + isLoging + "----" + action);  
  75.                 // 当时钟或者网络发生变化或者socket关闭或者发生异常时或者开机 时,判断socket连接是否出现异常  
  76.                 if (!LPushUser.getSocket(getApplicationContext()) && !isLoging  
  77.                         && LPushUser.getSocketEnable(getApplicationContext())  
  78.                         && !"".equals(IP) && !"".equals(PORT)  
  79.                         && !LPushUser.getConnectState(getApplicationContext())) {  
  80.                     // 掉线,执行登录动作  
  81.                       
  82.                       
  83.                     if (LNetworkUtil.netIsEnable(getApplicationContext())) {  
  84.                         mWebSocket.secondMethod(IP, PORT);  
  85.                         // 开始登录了,标记打开时间  
  86.                         // 1,获取当前时间  
  87.                         long currentTime = System.currentTimeMillis();  
  88.                         SimpleDateFormat sdf = new SimpleDateFormat(  
  89.                                 "yyyy-MM-dd HH:mm:ss");  
  90.                         Date date = new Date(currentTime);  
  91.                         String time = sdf.format(date);  
  92.                         // 修改标记的时间,保证5分钟嗅探链接一次  
  93.                         LPushUser.saveOpenTime(getApplicationContext(), time);  
  94.                     }  
  95.                 } else {  
  96.                     // APP端已经处于链接正常状态 -----5分钟嗅探链接一次  
  97.                     // 1,获取当前时间  
  98.                     long currentTime = System.currentTimeMillis();  
  99.                     SimpleDateFormat sdf = new SimpleDateFormat(  
  100.                             "yyyy-MM-dd HH:mm:ss");  
  101.                     Date date = new Date(currentTime);  
  102.                     String time = sdf.format(date);  
  103.                     // 2,比对链接打开时间  
  104.                     long minTime = LStringManager.dateDifference(  
  105.                             LPushUser.getOpenTime(getApplicationContext()),  
  106.                             time);  
  107.                     if (LPushUser.GETLOG(getApplicationContext())) {  
  108.                         Log.i("lpush",  
  109.                                 "链接时长----现在时间:"  
  110.                                         + time  
  111.                                         + ";保存时间:"  
  112.                                         + LPushUser  
  113.                                                 .getOpenTime(getApplicationContext())  
  114.                                         + ";时差" + minTime + "分钟");  
  115.                     }  
  116.                     if (minTime >= 5) {  
  117.                         // 大于等于5分钟,则重新链接  
  118.                           
  119.                         // 5分钟之后重新链接  
  120.                         // 修改被拒绝状态  
  121.                         if (LPushUser.getConnectState(getApplicationContext())) {  
  122.                             LPushUser.saveConnect(getApplicationContext(),  
  123.                                     false);  
  124.                         }  
  125.                         if (LNetworkUtil.netIsEnable(getApplicationContext())  
  126.                                 && LPushUser  
  127.                                         .getSocketEnable(getApplicationContext())) {  
  128.                             mWebSocket.secondMethod(IP, PORT);  
  129.                             // 修改标记的时间,保证5分钟嗅探链接一次  
  130.                             LPushUser.saveOpenTime(getApplicationContext(),  
  131.                                     time);  
  132.                         }  
  133.                     }  
  134.                 }  
  135.             }  
  136.         }  
  137.     }  
  138.   
  139.     /*** 
  140.      * 设置推送功能的使用与否,默认使用推送功能,若是关闭推送功能请设置false; 
  141.      *  
  142.      * @param enable 
  143.      *            是否使用 
  144.      *  
  145.      *            li‘mingqi  
  146.      */  
  147.     protected void setLPushEnable(boolean enable) {  
  148.         LPushUser.saveSocketEnable(getApplicationContext(), enable);  
  149.     }  
  150.   
  151.       
  152.     /*** 
  153.      *  
  154.      *  
  155.      * @param ip 
  156.      *            ip信息 
  157.      * @param port 
  158.      *            端口信息 
  159.      *  
  160.      *            li‘mingqi  
  161.      */  
  162.     protected void setSocketIPInfo(String ip, String port) {  
  163.         this.IP = ip;  
  164.         this.PORT = port;  
  165.     }  
  166.   
  167.       
  168.   
  169.     /** 
  170.      * 设置用户的Uid 
  171.      *  
  172.      * @param uid 
  173.      *            li‘mingqi  
  174.      */  
  175.     public void setUserInfo(int uid, String code) {  
  176.         /*** 
  177.          * 数据验证 
  178.          */  
  179.         // if (0 == uid || null == code || "".equals(code)) {  
  180.         // Log.e(TAG, "您输入的用户ID或者CODE值为空");  
  181.         // new NullPointerException("您输入的用户ID或者CODE值为空").printStackTrace();  
  182.         // return;  
  183.         // }  
  184.   
  185.         // 保存用户ID  
  186.         LPushUser.saveUserID(getApplicationContext(), uid);  
  187.         // 保存用户CODE  
  188.         LPushUser.saveUserCode(getApplicationContext(), code);  
  189.         // 重启链接  
  190.         mWebSocket.close();  
  191.     }  
  192.   
  193.     /*** 
  194.      * 设置是否查看日志 
  195.      *  
  196.      * @param flag 
  197.      *            是否查看日志 
  198.      * @version 1.2 li‘mingqi 
  199.      */  
  200.     public void openLogInfo(boolean flag) {  
  201.         LPushUser.SAVELOG(getApplicationContext(), flag);  
  202.     }  
  203.   
  204.     /*** 
  205.      * socket链接重置,服务器一直处于拒绝链接状态,客户端链接一次遭拒后标记了遭拒的状态,重置之后可进行再次开启链接; 
  206.      *  
  207.      * @version 1.3 li‘mingqi  
  208.      */  
  209.     private void reset() {  
  210.         LPushUser.saveConnect(getApplicationContext(), false);  
  211.     }  
  212. }  

UI界面显示部分就不贴出了,后面贴出Application类的目的就是监控各种手机系统的广播来嗅探自己的websocket链接的状态;用来维持它以保证能即时的收到消息;



以上是关于Websocket实现即时通讯的主要内容,如果未能解决你的问题,请参考以下文章

Android 使用WebSocket实现即时通讯功能,聊天功能

nodejs+expressjs+ws实现了websocket即时通讯,服务器和客户端互相通信

Websocket实现即时通讯

深入即时通讯开发协议WebSocket协议细节

HTML5+NodeJs实现WebSocket即时通讯

使用tomcat方式实现websocket即时通讯服务端讲解