iOS即时通讯之CocoaAsyncSocket源码解析五

Posted Francis

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了iOS即时通讯之CocoaAsyncSocket源码解析五相关的知识,希望对你有一定的参考价值。

接上篇:iOS即时通讯之CocoaAsyncSocket源码解析四         原文

前言:

本文为CocoaAsyncSocket Read篇终,将重点涉及该框架是如何利用缓冲区对数据进行读取、以及各种情况下的数据包处理,其中还包括普通的、和基于TLS的不同读取操作等等。

正文:

前文讲完了两次TLS建立连接的流程,接着就是本篇的重头戏了:doReadData方法。在这里我不准备直接把这个整个方法列出来,因为就光这一个方法,加上注释有1200行,整个贴过来也无法展开描述,所以在这里我打算对它分段进行讲解:

注:以下代码整个包括在doReadData大括号中:

//读取数据
- (void)doReadData
{
  ....
}

Part1.无法正常读取数据时的前置处理:

 1 //如果当前读取的包为空,或者flag为读取停止,这两种情况是不能去读取数据的
 2 if ((currentRead == nil) || (flags & kReadsPaused))
 3 {
 4      LogVerbose(@"No currentRead or kReadsPaused");
 5 
 6      // Unable to read at this time
 7      //如果是安全的通信,通过TLS/SSL
 8      if (flags & kSocketSecure)
 9      {
10        //刷新SSLBuffer,把数据从链路上移到prebuffer中 (当前不读取数据的时候做)
11           [self flushSSLBuffers];
12      }
13 
14    //判断是否用的是 CFStream的TLS
15      if ([self usingCFStreamForTLS])
16      {
17 
18      }
19      else
20      {
21        //挂起source
22           if (socketFDBytesAvailable > 0)
23           {
24                [self suspendReadSource];
25           }
26      }
27      return;
28 }

当我们当前读取的包是空或者标记为读停止状态的时候,则不会去读取数据。
前者不难理解,因为我们要读取的数据最终是要传给currentRead中去的,所以如果currentRead为空,我们去读数据也没有意义。
后者kReadsPaused标记是从哪里加上的呢?我们全局搜索一下,发现它才read超时的时候被添加。
讲到这我们顺便来看这个读取超时的一个逻辑,我们每次做读取任务传进来的超时,都会调用这么一个方法:

[self setupReadTimerWithTimeout:currentRead->timeout];
 1 //初始化读的超时
 2 - (void)setupReadTimerWithTimeout:(NSTimeInterval)timeout
 3 {
 4     if (timeout >= 0.0)
 5     {
 6         //生成一个定时器source
 7         readTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, socketQueue);
 8 
 9         __weak GCDAsyncSocket *weakSelf = self;
10 
11         //句柄
12         dispatch_source_set_event_handler(readTimer, ^{ @autoreleasepool {
13         #pragma clang diagnostic push
14         #pragma clang diagnostic warning "-Wimplicit-retain-self"
15 
16             __strong GCDAsyncSocket *strongSelf = weakSelf;
17             if (strongSelf == nil) return_from_block;
18 
19             //执行超时操作
20             [strongSelf doReadTimeout];
21 
22         #pragma clang diagnostic pop
23         }});
24 
25         #if !OS_OBJECT_USE_OBJC
26         dispatch_source_t theReadTimer = readTimer;
27 
28         //取消的句柄
29         dispatch_source_set_cancel_handler(readTimer, ^{
30         #pragma clang diagnostic push
31         #pragma clang diagnostic warning "-Wimplicit-retain-self"
32 
33             LogVerbose(@"dispatch_release(readTimer)");
34             dispatch_release(theReadTimer);
35 
36         #pragma clang diagnostic pop
37         });
38         #endif
39 
40         //定时器延时 timeout时间执行
41         dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(timeout * NSEC_PER_SEC));
42         //间隔为永远,即只执行一次
43         dispatch_source_set_timer(readTimer, tt, DISPATCH_TIME_FOREVER, 0);
44         dispatch_resume(readTimer);
45     }
46

这个方法定义了一个GCD定时器,这个定时器只执行一次,间隔就是我们的超时,很显然这是一个延时执行,那小伙伴要问了,这里为什么我们不用NSTimer或者下面这种方式:

[self performSelector:<#(nonnull SEL)#> withObject:<#(nullable id)#> afterDelay:<#(NSTimeInterval)#>

原因很简单,performSelector是基于runloop才能使用的,它本质是转化成runloop基于非端口的源source0。很显然我们所在的socketQueue开辟出来的线程,并没有添加一个runloop。而NSTimer也是一样。

所以这里我们用GCD Timer,因为它是基于XNU内核来实现的,并不需要借助于runloop

这里当超时时间间隔到达时,我们会执行超时操作:

[strongSelf doReadTimeout];
 1 /执行超时操作
 2 - (void)doReadTimeout
 3 {
 4     // This is a little bit tricky.
 5     // Ideally we\'d like to synchronously query the delegate about a timeout extension.
 6     // But if we do so synchronously we risk a possible deadlock.
 7     // So instead we have to do so asynchronously, and callback to ourselves from within the delegate block.
 8 
 9     //因为这里用同步容易死锁,所以用异步从代理中回调
10 
11     //标记读暂停
12     flags |= kReadsPaused;
13 
14     __strong id theDelegate = delegate;
15 
16     //判断是否实现了延时  补时的代理
17     if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:shouldTimeoutReadWithTag:elapsed:bytesDone:)])
18     {
19         //拿到当前读的包
20         GCDAsyncReadPacket *theRead = currentRead;
21 
22         //代理queue中回调
23         dispatch_async(delegateQueue, ^{ @autoreleasepool {
24 
25             NSTimeInterval timeoutExtension = 0.0;
26 
27             //调用代理方法,拿到续的时长
28             timeoutExtension = [theDelegate socket:self shouldTimeoutReadWithTag:theRead->tag
29                                                                          elapsed:theRead->timeout
30                                                                        bytesDone:theRead->bytesDone];
31 
32             //socketQueue中,做延时
33             dispatch_async(socketQueue, ^{ @autoreleasepool {
34 
35                 [self doReadTimeoutWithExtension:timeoutExtension];
36             }});
37         }});
38     }
39     else
40     {
41         [self doReadTimeoutWithExtension:0.0];
42     }
43 }
//做读取数据延时
- (void)doReadTimeoutWithExtension:(NSTimeInterval)timeoutExtension
{
    if (currentRead)
    {
        if (timeoutExtension > 0.0)
        {
            //把超时加上
            currentRead->timeout += timeoutExtension;

            // Reschedule the timer
            //重新生成时间
            dispatch_time_t tt = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(timeoutExtension * NSEC_PER_SEC));
            //重置timer时间
            dispatch_source_set_timer(readTimer, tt, DISPATCH_TIME_FOREVER, 0);

            // Unpause reads, and continue
            //在把paused标记移除
            flags &= ~kReadsPaused;
            //继续去读取数据
            [self doReadData];
        }
        else
        {
            //输出读取超时,并断开连接
            LogVerbose(@"ReadTimeout");

            [self closeWithError:[self readTimeoutError]];
        }
    }
}

这里调用了续时代理,如果我们实现了这个代理,则可以增加这个超时时间,然后重新生成超时定时器,移除读取停止的标记kReadsPaused。继续去读取数据。
否则我们就断开socket
注意:这个定时器会被取消,如果当前数据包被读取完成,这样就不会走到定时器超时的时间,则不会断开socket。讲到这是不是大家就有印象了?这个就是之前在楼主:
iOS即时通讯,从入门到“放弃”?中讲过的可以被用来做PingPong机制的原理。

我们接着回到doReadData中,我们讲到如果当前读取包为空或者状态为kReadsPaused,我们就去执行一些非读取数据的处理。
这里我们第一步去判断当前连接是否为kSocketSecure,也就是安全通道的TLS。如果是我们则调用

if (flags & kSocketSecure)
{
     //刷新,把TLS加密型的数据从链路上移到prebuffer中 (当前暂停的时候做)
     [self flushSSLBuffers];
}

按理说,我们有当前读取包的时候,在去从prebuffersocket中去读取,但是这里为什么要提前去读呢?
我们来看看这个框架作者的解释:

// Here\'s the situation:
// We have an established secure connection.
// There may not be a currentRead, but there might be encrypted data sitting around for us.
// When the user does get around to issuing a read, that encrypted data will need to be decrypted.
// So why make the user wait?
// We might as well get a head start on decrypting some data now.
// The other reason we do this has to do with detecting a socket disconnection.
// The SSL/TLS protocol has it\'s own disconnection handshake.
// So when a secure socket is closed, a "goodbye" packet comes across the wire.
// We want to make sure we read the "goodbye" packet so we can properly detect the TCP disconnection.

简单来讲,就是我们用TLS类型的Socket,读取数据的时候需要解密的过程,而这个过程是费时的,我们没必要让用户在读取数据的时候去等待这个解密的过程,我们可以提前在数据一到达,就去读取解密。
而且这种方式,还能时刻根据TLSgoodbye包来准确的检测到TCP断开连接。

在我们来看flushSSLBuffers方法之前,我们先来看看这个一直提到的全局缓冲区prebuffer的定义,它其实就是下面这么一个类的实例:

Part3.GCDAsyncSocketPreBuffer的定义

@interface GCDAsyncSocketPreBuffer : NSObject
{
    //unsigned char
    //提前的指针,指向这块提前的缓冲区
    uint8_t *preBuffer;
    //size_t 它是一个与机器相关的unsigned类型,其大小足以保证存储内存中对象的大小。
    //它可以存储在理论上是可能的任何类型的数组的最大大小
    size_t preBufferSize;
    //读的指针
    uint8_t *readPointer;
    //写的指针
    uint8_t *writePointer;
}

里面存了3个指针,包括preBuffer起点指针、当前读写所处位置指针、以及一个preBufferSize,这个sizepreBuffer所指向的位置,在内存中分配的空间大小。

我们来看看它的几个方法:

//初始化
- (id)initWithCapacity:(size_t)numBytes
{
    if ((self = [super init]))
    {
        //设置size
        preBufferSize = numBytes;
        //申请size大小的内存给preBuffer
        preBuffer = malloc(preBufferSize);

        //为同一个值
        readPointer = preBuffer;
        writePointer = preBuffer;
    }
    return self;
}

包括一个初始化方法,去初始化preBufferSize大小的一块内存空间。然后3个指针都指向这个空间。

- (void)dealloc
{
    if (preBuffer)
        free(preBuffer);
}

销毁的方法:释放preBuffer。

 1 //确认读的大小
 2 - (void)ensureCapacityForWrite:(size_t)numBytes
 3 {
 4     //拿到当前可用的空间大小
 5     size_t availableSpace = [self availableSpace];
 6 
 7     //如果申请的大小大于可用的大小
 8     if (numBytes > availableSpace)
 9     {
10         //需要多出来的大小
11         size_t additionalBytes = numBytes - availableSpace;
12         //新的总大小
13         size_t newPreBufferSize = preBufferSize + additionalBytes;
14         //重新去分配preBuffer
15         uint8_t *newPreBuffer = realloc(preBuffer, newPreBufferSize);
16 
17         //读的指针偏移量(已读大小)
18         size_t readPointerOffset = readPointer - preBuffer;
19         //写的指针偏移量(已写大小)
20         size_t writePointerOffset = writePointer - preBuffer;
21         //提前的Buffer重新复制
22         preBuffer = newPreBuffer;
23         //大小重新赋值
24         preBufferSize = newPreBufferSize;
25 
26         //读写指针重新赋值 + 上偏移量
27         readPointer = preBuffer + readPointerOffset;
28         writePointer = preBuffer + writePointerOffset;
29     }
30 }

确保prebuffer可用空间的方法:这个方法会重新分配preBuffer,直到可用大小等于传递进来的numBytes,已用大小不会变。

 1 //仍然可读的数据,过程是先写后读,只有写的大于读的,才能让你继续去读,不然没数据可读了
 2 - (size_t)availableBytes
 3 {
 4     return writePointer - readPointer;
 5 }
 6 
 7 - (uint8_t *)readBuffer
 8 {
 9     return readPointer;
10 }
11 
12 - (void)getReadBuffer:(uint8_t **)bufferPtr availableBytes:(size_t *)availableBytesPtr
13 {
14     if (bufferPtr) *bufferPtr = readPointer;
15     if (availableBytesPtr) *availableBytesPtr = [self availableBytes];
16 }
17 
18 //读数据的指针
19 - (void)didRead:(size_t)bytesRead
20 {
21     readPointer += bytesRead;
22     //如果读了这么多,指针和写的指针还相同的话,说明已经读完,重置指针到最初的位置
23     if (readPointer == writePointer)
24     {
25         // The prebuffer has been drained. Reset pointers.
26         readPointer  = preBuffer;
27         writePointer = preBuffer;
28     }
29 }
30 //prebuffer的剩余空间  = preBufferSize(总大小) - (写的头指针 - preBuffer一开的指针,即已被写的大小)
31 
32 - (size_t)availableSpace
33 {
34     return preBufferSize - (writePointer - preBuffer);
35 }
36 
37 - (uint8_t *)writeBuffer
38 {
39     return writePointer;
40 }
41 
42 - (void)getWriteBuffer:(uint8_t **)bufferPtr availableSpace:(size_t *)availableSpacePtr
43 {
44     if (bufferPtr) *bufferPtr = writePointer;
45     if (availableSpacePtr) *availableSpacePtr = [self availableSpace];
46 }
47 
48 - (void)didWrite:(size_t)bytesWritten
49 {
50     writePointer += bytesWritten;
51 }
52 
53 - (void)reset
54 {
55     readPointer  = preBuffer;
56     writePointer = preBuffer;
57 }

然后就是对读写指针进行处理的方法,如果读了多少数据readPointer就后移多少,写也是一样。
而获取当前未读数据,则是用已写指针-已读指针,得到的差值,当已读=已写的时候,说明prebuffer数据读完,则重置读写指针的位置,还是指向初始化位置。

讲完全局缓冲区对于指针的处理,我们接着往下说

Part4.flushSSLBuffers方法:

  1 //缓冲ssl数据
  2 - (void)flushSSLBuffers
  3 {
  4      LogTrace();
  5      //断言为安全Socket
  6      NSAssert((flags & kSocketSecure), @"Cannot flush ssl buffers on non-secure socket");
  7      //如果preBuffer有数据可读,直接返回
  8      if ([preBuffer availableBytes] > 0)
  9      {
 10           return;
 11      }
 12 
 13      #if TARGET_OS_IPHONE
 14      //如果用的CFStream的TLS,把数据用CFStream的方式搬运到preBuffer中
 15      if ([self usingCFStreamForTLS])
 16      {
 17         //如果flag为kSecureSocketHasBytesAvailable,而且readStream有数据可读
 18           if ((flags & kSecureSocketHasBytesAvailable) && CFReadStreamHasBytesAvailable(readStream))
 19           {
 20                LogVerbose(@"%@ - Flushing ssl buffers into prebuffer...", THIS_METHOD);
 21 
 22             //默认一次读的大小为4KB??
 23                CFIndex defaultBytesToRead = (1024 * 4);
 24 
 25             //用来确保有这么大的提前buffer缓冲空间
 26                [preBuffer ensureCapacityForWrite:defaultBytesToRead];
 27                //拿到写的buffer
 28                uint8_t *buffer = [preBuffer writeBuffer];
 29 
 30             //从readStream中去读, 一次就读4KB,读到数据后,把数据写到writeBuffer中去   如果读的大小小于readStream中数据流大小,则会不停的触发callback,直到把数据读完为止。
 31                CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);
 32             //打印结果
 33                LogVerbose(@"%@ - CFReadStreamRead(): result = %i", THIS_METHOD, (int)result);
 34 
 35             //大于0,说明读写成功
 36                if (result > 0)
 37                {
 38                 //把写的buffer头指针,移动result个偏移量
 39                     [preBuffer didWrite:result];
 40                }
 41 
 42             //把kSecureSocketHasBytesAvailable 仍然可读的标记移除
 43                flags &= ~kSecureSocketHasBytesAvailable;
 44           }
 45 
 46           return;
 47      }
 48 
 49      #endif
 50 
 51     //不用CFStream的处理方法
 52 
 53     //先设置一个预估可用的大小
 54      __block NSUInteger estimatedBytesAvailable = 0;
 55      //更新预估可用的Block
 56      dispatch_block_t updateEstimatedBytesAvailable = ^{
 57 
 58         //预估大小 = 未读的大小 + SSL的可读大小
 59           estimatedBytesAvailable = socketFDBytesAvailable + [sslPreBuffer availableBytes];
 60 
 61 
 62           size_t sslInternalBufSize = 0;
 63         //获取到ssl上下文的大小,从sslContext中
 64           SSLGetBufferedReadSize(sslContext, &sslInternalBufSize);
 65           //再加上下文的大小
 66           estimatedBytesAvailable += sslInternalBufSize;
 67      };
 68 
 69     //调用这个Block
 70      updateEstimatedBytesAvailable();
 71 
 72     //如果大于0,说明有数据可读
 73      if (estimatedBytesAvailable > 0)
 74      {
 75 
 76           LogVerbose(@"%@ - Flushing ssl buffers into prebuffer...", THIS_METHOD);
 77 
 78         //标志,循环是否结束,SSL的方式是会阻塞的,直到读的数据有estimatedBytesAvailable大小为止,或者出错
 79           BOOL done = NO;
 80           do
 81           {
 82                LogVerbose(@"%@ - estimatedBytesAvailable = %lu", THIS_METHOD, (unsigned long)estimatedBytesAvailable);
 83 
 84                // Make sure there\'s enough room in the prebuffer
 85                //确保有足够的空间给prebuffer
 86                [preBuffer ensureCapacityForWrite:estimatedBytesAvailable];
 87 
 88                // Read data into prebuffer
 89                //拿到写的buffer
 90                uint8_t *buffer = [preBuffer writeBuffer];
 91                size_t bytesRead = 0;
 92                //用SSLRead函数去读,读到后,把数据写到buffer中,estimatedBytesAvailable为需要读的大小,bytesRead这一次实际读到字节大小,为sslContext上下文
 93                OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);
 94                LogVerbose(@"%@ - read from secure socket = %u", THIS_METHOD, (unsigned)bytesRead);
 95 
 96             //把写指针后移bytesRead大小
 97                if (bytesRead > 0)
 98                {
 99                     [preBuffer didWrite:bytesRead];
100                }
101 
102                LogVerbose(@"%@ - prebuffer.length = %zu", THIS_METHOD, [preBuffer availableBytes]);
103 
104             //如果读数据出现错误
105                if (result != noErr)
106                {
107                     done = YES;
108                }
109                else
110                {
111                 //在更新一下可读的数据大小
112                     updateEstimatedBytesAvailable();
113                }
114 
115           }
116         //只有done为NO,而且 estimatedBytesAvailable大于0才继续循环
117         while (!done && estimatedBytesAvailable > 0);
118      }
119 }

这个方法有点略长,包含了两种SSL的数据处理:

  1. CFStream类型:我们会调用下面这个函数去从stream并且读取数据并解密:
  2. CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);

    数据被读取到后,直接转移到了prebuffer中,并且调用:

  3. [preBuffer didWrite:result];

    让写指针后移读取到的数据大小。
    这里有两个关于CFReadStreamRead方法,需要注意的问题:
    1)就是我们调用它去读取4KB数据,并不仅仅是只读这么多,而是因为这个方法是会递归调用的,它每次只读4KB,直到把stream中的数据读完。
    2)我们之前设置的CFStream函数的回调,在数据来了之后只会被触发一次,以后数据再来都不会触发。直到我们调用这个方法,把stream中的数据读完,下次再来数据才会触发函数回调。这也是我们在使用CFStream的时候,不需要担心像source那样,有数据会不断的被触发回调,而需要挂起像source那样挂起stream(实际也没有这样的方法)。

2. SSL安全通道类型:这里我们主要是循环去调用下面这个函数去读取数据:

OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);

 

其他的基本和CFStream一致

这里需要注意的是SSLRead这个方法,并不是直接从我们的socket中获取到的数据,而是从我们一开始绑定的SSL回调函数中,得到数据。而回调函数本身,也需要调用read函数从socket中获取到加密的数据。然后再经由SSLRead这个方法,数据被解密,并且传递给buffer

至于SSLRead绑定的回调函数,是怎么处理数据读取的,因为它处理数据的流程,和我们doReadData后续数据读取处理基本相似,所以现在暂时不提。

 

我们绕了一圈,讲完了这个包为空或者当前暂停状态下的前置处理,总结一下:

  1. 就是如果是SSL类型的数据,那么先解密了,缓冲到prebuffer中去。
  2. 判断当前socket可读数据大于0,非CFStreamSSL类型,则挂起source,防止反复触发。

Part5.接着我们开始doReadData正常数据处理流程:

首先它大的方向,依然是分为3种类型的数据处理:
1.SSL安全通道; 2.CFStream类型SSL; 3.普通数据传输。
因为这3种类型的代码,重复部分较大,处理流程基本类似,只不过调用读取方法所有区别:

//1.
OSStatus result = SSLRead(sslContext, buffer, (size_t)estimatedBytesAvailable, &bytesRead);
//2.
CFIndex result = CFReadStreamRead(readStream, buffer, defaultBytesToRead);
//3.
ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);

SSLRead回调函数内部,也调用了第3种read读取,这个我们后面会说。
现在这里我们将跳过前两种(方法部分调用可以见上面

以上是关于iOS即时通讯之CocoaAsyncSocket源码解析五的主要内容,如果未能解决你的问题,请参考以下文章

IOS 即时通讯 + 微信聊天框架 + 源码

李洪强关于即时通讯

iOS开发之即时通讯之Socket(AsyncSocket)

iOS即时通讯之简单聊天室

iOS开发之即时通讯之Socket(AsyncSocket)

iOS开发之即时通讯之Socket(AsyncSocket)