网络编程之手把手教你写基于TCP的Socket长连接

Posted 大忽悠爱忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络编程之手把手教你写基于TCP的Socket长连接相关的知识,希望对你有一定的参考价值。


TCP/IP 协议简介

IP协议

首先我们看 IP(Internet Protocol)协议。IP 协议提供了主机和主机间的通信。

为了完成不同主机的通信,我们需要某种方式来唯一标识一台主机,这个标识,就是著名的IP地址。通过IP地址,IP 协议就能够帮我们把一个数据包发送给对方


TCP协议

前面我们说过,IP 协议提供了主机和主机间的通信。TCP 协议在 IP 协议提供的主机间通信功能的基础上,完成这两个主机上进程对进程的通信。

有了 IP,不同主机就能够交换数据。但是,计算机收到数据后,并不知道这个数据属于哪个进程(简单讲,进程就是一个正在运行的应用程序)。TCP 的作用就在于,让我们能够知道这个数据属于哪个进程,从而完成进程间的通信。

为了标识数据属于哪个进程,我们给需要进行 TCP 通信的进程分配一个唯一的数字来标识它。这个数字,就是我们常说的端口号。

TCP 的全称是 Transmission Control Protocol,大家对它说得最多的,大概就是面向连接的特性了。之所以说它是有连接的,是说在进行通信前,通信双方需要先经过一个三次握手的过程。三次握手完成后,连接便建立了。这时候我们才可以开始发送/接收数据。(与之相对的是 UDP,不需要经过握手,就可以直接发送数据)。


下面我们简单了解一下三次握手的过程:

  • 首先,客户向服务端发送一个 SYN,假设此时 sequence number 为 x。这个 x是由操作系统根据一定的规则生成的,不妨认为它是一个随机数;
  • 服务端收到 SYN 后,会向客户端再发送一个 SYN,此时服务器的 seq number = y。与此同时,会 ACK x+1,告诉客户端“已经收到了 SYN,可以发送数据了”;
  • 客户端收到服务器的 SYN 后,回复一个 ACK y+1,这个 ACK 则是告诉服务器,SYN 已经收到,服务器可以发送数据了。

经过这 3 步,TCP 连接就建立了,这里需要注意的有三点:

  • 连接是由客户端主动发起的;
  • 在第 3 步客户端向服务器回复 ACK 的时候,TCP 协议是允许我们携带数据的。之所以做不到,是 API 的限制导致的;
  • TCP 协议还允许 “四次握手” 的发生,同样的,由于 API 的限制,这个极端的情况并不会发生。

Socket 基本用法

Socket 是 TCP 层的封装,通过 socket,我们就能进行 TCP 通信。

在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户连接的 ServerSocket 和用于通信的 Socket

使用 socket 的步骤如下:

1)创建 ServerSocket 并监听客户连接;
2)使用 Socket 连接服务端;
3)通过 Socket.getInputStream()/getOutputStream() 获取输入输出流进行通信。

下面,我们通过实现一个简单的 echo 服务来学习 socket 的使用。所谓的 echo 服务,就是客户端向服务端写入任意数据,服务器都将数据原封不动地写回给客户端。


第一步:创建 ServerSocket 并监听客户连接

public class EchoServer 
 
    private final ServerSocket mServerSocket;
 
    public EchoServer(int port) throws IOException 
        // 1. 创建一个 ServerSocket 并监听端口 port
        mServerSocket = new ServerSocket(port);
    
 
    public void run() throws IOException 
        // 2. 开始接受客户连接
        Socket client = mServerSocket.accept();
        handleClient(client);
    
 
    private void handleClient(Socket socket) 
        // 3. 使用 socket 进行通信 ...
    
 
 
    public static void main(String[] argv) 
        try 
            EchoServer server = new EchoServer(9877);
            server.run();
         catch (IOException e) 
            e.printStackTrace();
        
    


第二步:使用 Socket 连接服务端

public class EchoClient 
 
    private final Socket mSocket;
 
    public EchoClient(String host, int port) throws IOException 
        // 创建 socket 并连接服务器
        mSocket = new Socket(host, port);
    
 
    public void run() 
        // 和服务端进行通信
    
 
 
    public static void main(String[] argv) 
        try 
            // 由于服务端运行在同一主机,这里我们使用 localhost
            EchoClient client = new EchoClient("localhost", 9877);
            client.run();
         catch (IOException e) 
            e.printStackTrace();
        
    


第三步:通过 socket.getInputStream()/getOutputStream() 获取输入/输出流进行通信

首先,我们来实现服务端:

public class EchoServer 
    // ...
 
    private void handleClient(Socket socket) throws IOException 
        InputStream in = socket.getInputStream();
        OutputStream out = socket.getOutputStream();
        byte[] buffer = new byte[1024];
        int n;
        while ((n = in.read(buffer)) > 0) 
            out.write(buffer, 0, n);
        
    

可以看到,服务端的实现其实很简单,我们不停地读取输入数据,然后写回给客户端。

下面我们看看客户端:

public class EchoClient 
 ... 
    public void run() 
        //单独开辟一个线程进行读取操作
        Thread readThread = new Thread(this::readResponse);
        readThread.start();
       //用户在写入数据,发送给服务器
        try(OutputStream out = mSocket.getOutputStream())
        
            byte[] buffer = new byte[1024];
            int n;
            while ((n = System.in.read(buffer)) > 0) 
                out.write(buffer, 0, n);
            
         catch (IOException e) 
            e.printStackTrace();
        
    


    private void readResponse()
    
        try(InputStream inputStream=mSocket.getInputStream();OutputStream outputStream = mSocket.getOutputStream();)
        
            byte[] buffer=new byte[1024];
            int read=-1;
            while((read=inputStream.read(buffer))!=-1) 
             //打印输出在控制台上
                System.out.write(buffer, 0, read);
            
        
        catch (IOException e) 
            e.printStackTrace();
        
    
    ...

客户端会稍微复杂一点点,在读取用户输入的同时,我们又想读取服务器的响应。所以,这里创建了一个线程来读服务器的响应。

不熟悉 lambda 的读者,可以把Thread readerThread = new Thread(this::readResponse) 换成下面这个代码:

Thread readerThread = new Thread(new Runnable() 
    @Override
    public void run() 
        readResponse();
    
);

打开两个 terminal 分别执行如下命令:


在客户端,我们会看到,输入的所有字符都打印了出来。


Socket、ServerSocket 傻傻分不清楚

在进入这一节的主题前,读者不妨先考虑一个问题:在上一节的实例中,我们运行 echo 服务后,在客户端连接成功时,一共有多少个 socket 存在?

答案是 3 个 socket:客户端一个,服务端有两个。跟这个问题的答案直接关联的是本节的主题——Socket 和 ServerSocket 的区别是什么。

眼尖的读者,可能会注意到在上一节我是这样描述他们的:

在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户连接的 ServerSocket 和用于通信的 Socket。

注意:我只说 ServerSocket 是用于监听客户连接,而没有说它也可以用来通信。下面我们来详细了解一下他们的区别。

注:以下描述使用的是 UNIX/Linux 系统的 API。

首先,我们创建 ServerSocket 后,内核会创建一个 socket。这个 socket 既可以拿来监听客户连接,也可以连接远端的服务。由于 ServerSocket 是用来监听客户连接的,紧接着它就会对内核创建的这个 socket 调用 listen 函数。这样一来,这个 socket 就成了所谓的 listening socket,它开始监听客户的连接。

接下来,我们的客户端创建一个 Socket,同样的,内核也创建一个 socket 实例。内核创建的这个 socket 跟 ServerSocket 一开始创建的那个没有什么区别。不同的是,接下来 Socket 会对它执行 connect,发起对服务端的连接。前面我们说过,socket API 其实是 TCP 层的封装,所以 connect 后,内核会发送一个 SYN 给服务端。

现在,我们切换角色到服务端。服务端的主机在收到这个 SYN 后,会创建一个新的 socket,这个新创建的 socket 跟客户端继续执行三次握手过程。

三次握手完成后,我们执行的 serverSocket.accept() 会返回一个 Socket 实例,这个 socket 就是上一步内核自动帮我们创建的。

所以说:在一个客户端连接的情况下,其实有 3 个 socket。


关于内核自动创建的这个 socket,还有一个很有意思的地方。它的端口号跟 ServerSocket 是一毛一样的。咦!!不是说,一个端口只能绑定一个 socket 吗?其实这个说法并不够准确。

前面我说的TCP 通过端口号来区分数据属于哪个进程的说法,在 socket 的实现里需要改一改。Socket 并不仅仅使用端口号来区别不同的 socket 实例,而是使用 <peer addr:peer port, local addr:local port> 这个四元组。

在上面的例子中,我们的 ServerSocket 长这样:<:, *:9877>。意思是,可以接受任何的客户端,和本地任何 IP。

accept 返回的 Socket 则是这样:<127.0.0.1:xxxx, 127.0.0.1:9877>。其中,xxxx 是客户端的端口号。

如果数据是发送给一个已连接的 socket,内核会找到一个完全匹配的实例,所以数据准确发送给了对端。

如果是客户端要发起连接,这时候只有 <:, *:9877> 会匹配成功,所以 SYN 也准确发送给了监听套接字。


Socket “长”连接的实现

背景知识

Socket 长连接,指的是在客户和服务端之间保持一个 socket 连接长时间不断开。

比较熟悉 Socket 的读者,可能知道有这样一个 API:

socket.setKeepAlive(true);

嗯……keep alive,“保持活着”,这个应该就是让 TCP 不断开的意思。那么,我们要实现一个 socket 的长连接,只需要这一个调用即可。

遗憾的是,生活并不总是那么美好。对于 4.4BSD 的实现来说,Socket 的这个 keep alive 选项如果打开并且两个小时内没有通信,那么底层会发一个心跳,看看对方是不是还活着。

注意:两个小时才会发一次。也就是说,在没有实际数据通信的时候,我把网线拔了,你的应用程序要经过两个小时才会知道。


在说明如何实现长连接前,我们先来理一理我们面临的问题

假定现在有一对已经连接的 socket,在以下情况发生时候,socket 将不再可用:

  1. 某一端关闭 socket:主动关闭的一方会发送 FIN,通知对方要关闭 TCP 连接。在这种情况下,另一端如果去读socket,将会读到 EoF(End of File)。于是我们知道对方关闭了 socket;
  2. 应用程序奔溃:此时 socket 会由内核关闭,结果跟情况1一样;
  3. 系统奔溃:这时候系统是来不及发送 FIN 的,因为它已经跪了。此时对方无法得知这一情况。对方在尝试读取数据时,最后会返回 read time out。如果写数据,则是 host unreachable 之类的错误。
  4. 电缆被挖断、网线被拔:跟情况3差不多,如果没有对 socket 进行读写,两边都不知道发生了事故。跟情况3不同的是,如果我们把网线接回去,socket 依旧可以正常使用。

在上面的几种情形中,有一个共同点就是,只要去读、写 socket,只要 socket 连接不正常,我们就能够知道。基于这一点,要实现一个 socket 长连接,我们需要做的就是不断地给对方写数据,然后读取对方的数据,也就是所谓的心跳。只要心还在跳,socket 就是活的。写数据的间隔,需要根据实际的应用需求来决定。

心跳包不是实际的业务数据,根据通信协议的不同,需要做不同的处理。

比方说,我们使用 JSON 进行通信,那么,可以为协议包加一个 type 字段,表面这个 JSON 是心跳还是业务数据:


    "type": 0,  // 0 表示心跳
 
    // ...

使用二进制协议的情况类似。要求就是,我们能够区别一个数据包是心跳还是真实数据。这样,我们便实现了一个 socket 长连接。


实现示例

这一小节我们一起来实现一个带长连接的 android echo 客户端。

首先是接口部分:

package dhy.com;

public final class LongLiveSocket 
 
    /**
     * 错误回调
     */
    public interface ErrorCallback 
        /**
         * 如果需要重连,返回 true
         */
        boolean onError();
    
 
 
    /**
     * 读数据回调
     */
    public interface DataCallback 
        void onData(byte[] data, int offset, int len);
    
 
 
    /**
     * 写数据回调
     */
    public interface WritingCallback 
        void onSuccess();
        void onFail(byte[] data, int offset, int len);
    
 
 
    public LongLiveSocket(String host, int port,
                          DataCallback dataCallback, ErrorCallback errorCallback) 
    
 
    public void write(byte[] data, WritingCallback callback) 
    
 
    public void write(byte[] data, int offset, int len, WritingCallback callback) 
    
 
    public void close() 
    

我们这个支持长连接的类就叫 LongLiveSocket 好了。如果在 socket 断开后需要重连,只需要在对应的接口里面返回 true 即可(在真实场景里,我们还需要让客户设置重连的等待时间,还有读写、连接的 timeout等。为了简单,这里就直接不支持了。

另外需要注意的一点是,如果要做一个完整的库,需要同时提供阻塞式和回调式API。同样由于篇幅原因,这里直接省掉了。

下面我们直接看实现:

public final class LongLiveSocket 
    private static final String TAG = "LongLiveSocket";
 
    private static final long RETRY_INTERVAL_MILLIS = 3 * 1000;
    private static final long HEART_BEAT_INTERVAL_MILLIS = 5 * 1000;
    private static final long HEART_BEAT_TIMEOUT_MILLIS = 2 * 1000;
 
    /**
     * 错误回调
     */
    public interface ErrorCallback 
        /**
         * 如果需要重连,返回 true
         */
        boolean onError();
    
 
 
    /**
     * 读数据回调
     */
    public interface DataCallback 
        void onData(byte[] data, int offset, int len);
    
 
 
    /**
     * 写数据回调
     */
    public interface WritingCallback 
        void onSuccess();
        void onFail(byte[] data, int offset, int len);
    
 
 
    private final String mHost;
    private final int mPort;
    private final DataCallback mDataCallback;
    private final ErrorCallback mErrorCallback;
 
    private final HandlerThread mWriterThread;
    private final Handler mWriterHandler;
    private final Handler mUIHandler = new Handler(Looper.getMainLooper());
 
    private final Object mLock = new Object();
    private Socket mSocket;  // guarded by mLock
    private boolean mClosed; // guarded by mLock
 
    private final Runnable mHeartBeatTask = new Runnable() 
        private byte[] mHeartBeat = new byte[0];
 
        @Override
        public void run() 
            // 我们使用长度为 0 的数据作为 heart beat
            write(mHeartBeat, new WritingCallback() 
                @Override
                public void onSuccess() 
                    // 每隔 HEART_BEAT_INTERVAL_MILLIS 发送一次
                    mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);
                    mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);
                
 
                @Override
                public void onFail(byte[] data, int offset, int len) 
                    // nop
                    // write() 方法会处理失败
                
            );
        
    ;
 
    private final Runnable mHeartBeatTimeoutTask = () -> 
        Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");
        closeSocket();
    ;
 
 
    public LongLiveSocket(String host, int port,
                          DataCallback dataCallback, ErrorCallback errorCallback) 
        mHost = host;
        mPort = port;
        mDataCallback = dataCallback;
        mErrorCallback = errorCallback;
 
        mWriterThread = new HandlerThread("socket-writer");
        mWriterThread.start();
        mWriterHandler = new Handler(mWriterThread.getLooper());
        mWriterHandler.post(this::initSocket);
    
 
    private void initSocket() 
        while (true) 
            if (closed()) return;
 
            try 
                Socket socket = new Socket(mHost, mPort);
                synchronized (mLock) 
                    // 在我们创建 socket 的时候,客户可能就调用了 close()
                    if (mClosed) 
                        silentlyClose(socket);
                        return;
                    
                    mSocket = socket;
                    // 每次创建新的 socket,会开一个线程来读数据
                    Thread reader = new Thread(new ReaderTask(socket), "socket-reader");
                    reader.start();
                    mWriterHandler.post(mHeartBeatTask);
                
                break;
             catch (IOException e) 
                Log.e(TAG, "initSocket: ", e);
                if (closed() || !mErrorCallback.onError()) 
                    break;
                
                try 
                    TimeUnit.MILLISECONDS.sleep(RETRY_INTERVAL_MILLIS);
                 catch (InterruptedException e1) 
                    // interrupt writer-thread to quit
                    break;
                
            
        
    
 
    public void write(byte[] data, WritingCallback callback) 
        write(data, 0, data.length, callback);
    
 
    public void write(byte[] data, int offset, int len, WritingCallback callback) 
        mWriterHandler.post(() -> 
            Socket socket = getSocket();
            if (socket == null) 
                // initSocket 失败而客户说不需要重连,但客户又叫我们给他发送数据
                throw new IllegalStateException("Socket not initialized");
            
            try 
                OutputStream outputStream = socket.getOutputStream();
                DataOutputStream out = new DataOutputStream(outputStream);
                out.writeInt(len);
                out.write(data, offset, len);
                callback.onSuccess();
             catch (IOException e) 
                Log.e(TAG, "write: ", e);
                closeSocket();
                callback.onFail(data, offset, len);
                if (!closed() && mErrorCallback.onError()) 手把手教你写Linux设备驱动---定时器(基于友善之臂4412开发板)

Android开发之手把手教你写ButterKnife框架

Android开发之手把手教你写ButterKnife框架

手把手教你写!消息中间件可以用在如下哪个场景

手把手教你写网络爬虫:网易云音乐歌单

vue之手把手教你写日历组件