Socket网络编程学习笔记 (10)简易聊天室案例

Posted 鮀城小帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Socket网络编程学习笔记 (10)简易聊天室案例相关的知识,希望对你有一定的参考价值。

1. 聊天室数据传输设计

  • 必要条件:客户端、服务器
  • 必要约束:数据传输协议
  • 原理:服务器监听消息来源、客户端链接服务器并发送消息到服务器

1.1 客户端、服务器数据交互

client 发送消息到服务器端,服务器端回复消息也就是回送消息。

1.2 数据传输协议

如上图,数据在传输的时候,需要在尾部追加换行符,也就是说原来5个字节的数据,在实际传输时,是有6个字节长度的。

1.3 服务器、多客户端模型

在客户端有多个情况下,客户端都会向服务器端进行发送消息;想要在PC发送消息给服务器端时,也让安卓、平板等终端都能收到,其操作应该是,当PC端发送一条消息到服务器端之后,服务器端得到该数据后,它会把这条数据发送(回送)给当前连接的客户端。而这些当前连接的客户端收到这条消息后,就实现了把PC消息发送到手机的过程。

2. 客户端如何发送消息到另外一个客户端

  • 每个客户端都是服务器也是客户端? 答:不是
  • 2个以上设备如何交互数据?  答:约定一个基础的数据格式,这里使用回车换行符来作为信息的截断
  • 客户端-服务器-转发到客户端,如下图:
    • User1发送消息到服务端,服务端将消息转发给其他的客户端(比如User2),从而实现聊天室的功能

3. 聊天室消息接收实现

3.1 结构

代码分为四个module,分别为lib-clink、sample-client、sample-foo、sample-server。

(1)lib-clink

该module为提供工具类进行校验与流处理。

(2)sample-client ,客户端代码,需要依赖 lib-clink、sample-foo两个module

(3)sample-foo ,基础的共用类代码

 

 (4)sample-server,服务端代码,需要依赖 lib-clink、sampl-foo两个module

(5)lib-clink、sample-foo的工具类、基础数据类参考前面 TCP点对点传输的代码逻辑

3.2 sample-client

初版代码和TCP点对点传输的基本一致,聊天室主要在TCPServer端进行转发,所以Client不需要代码重构。

3.3 sample-server

初版代码和TCP点对点传输的基本一致,要实现聊天室消息接收则需要进行重构。主要重构 TCPServer.java 、ClientHandler.java类。

(1)ClientHandler.java - 消息转发

原有的消息在收到后就只是打印到控制台

// 打印到屏幕
System.out.println(str);

而实现聊天室功能需要将收到的消息进行通知出去。这里可以通过 CloseNotify() 接口进行实现。这里对该接口进行改造,并新增转发的接口方法来将消息通知回去。

    /**
     * 消息回调
     */
    public interface ClientHandlerCallback 
        // 自身不安比通知
        void onSelfClosed(ClientHandler handler);
        // 收到消息时通知
        void onNewMessageArrived(ClientHandler handler,String msg);
    

在将消息打印到屏幕的同时,将消息通知出去:

       // 打印到屏幕
       System.out.println(str);
       clientHandlerCallback.onNewMessageArrived(ClientHandler.this,str);

调用onNewMessageArrived()方法从而进行转发。这里主要是把当前收到的消息传递回去,同时也要把自身传递回去。

(2)自身描述信息的构建

新增clientInfo类变量:

    private final String clientInfo;

自身描述信息初始化:

    public ClientHandler(Socket socket, ClientHandlerCallback clientHandlerCallback) throws IOException 
        this.socket = socket;
        this.readHandler = new ClientReadHandler(socket.getInputStream());
        this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
        this.clientHandlerCallback = clientHandlerCallback;
        // 新增自身描述信息
        this.clientInfo = "A[" + socket.getInetAddress().getHostAddress() + "] P[" + socket.getPort() + "]";
        System.out.println("新客户端连接:" + clientInfo);
    
    public String getClientInfo() 
        return clientInfo;
    

(3)重构TCPServer.java
重构 clientHandler.ClientHandlerCallback的两个回调方法,这里要将之提到TCPServer.java类上。

让TCPServer.java 实现 clientHandler.ClientHandlerCallback接口。并实现两个方法:

    @Override
    public synchronized void onSelfClosed(ClientHandler handler) 
    

    @Override
    public void onNewMessageArrived(ClientHandler handler, String msg) 
    

并将 客户端构建溢出线程的remove操作迁移到  onSelfClosed() 方法实现内:

    @Override
    public synchronized void onSelfClosed(ClientHandler handler) 
        clientHandlerList.remove(handler);
    

原有的ClientHandler异步线程处理逻辑如下:

        // 客户端构建异步线程
        ClientHandler clientHandler = new ClientHandler(client,
                    handler -> clientHandlerList.remove(handler));

重构后,如下:

    // 客户端构建异步线程
    ClientHandler clientHandler = new ClientHandler(client,TCPServer.this);

消息转发:

    /**
     * 转发消息给其他客户端
     * @param handler
     * @param msg
     */
    @Override
    public void onNewMessageArrived(ClientHandler handler, String msg) 
        // 打印到屏幕
        System.out.println("Received-" + handler.getClientInfo() + ":" + msg);
        // 转发
        forwardingThreadPoolExecutor.execute(()->
             for (ClientHandler clientHandler : clientHandlerList)
                 if(clientHandler.equals(handler))
                      // 跳过自己
                      continue;
                 
                 // 向其他客户端投递消息
                 clientHandler.send(msg);
            
        );
    

(4)基于synchronized 解决多线程操作的安全问题:

由于这里有对 clientHandlerList集合的删除、添加、遍历等操作,这涉及到对所有客户端的操作,在多线程的环境下,默认的List不是线程安全的,所以存在多线程的安全问题。

    public void stop() 
        if (mListener != null) 
            mListener.exit();
        
        synchronized (TCPServer.this)
            for (ClientHandler clientHandler : clientHandlerList) 
                clientHandler.exit();
            
            clientHandlerList.clear();
        

        // 停止线程池
        forwardingThreadPoolExecutor.shutdownNow();
    

    public synchronized void broadcast(String str) 
        for (ClientHandler clientHandler : clientHandlerList) 
            clientHandler.send(str);
        
    

    /**
     * 删除当前消息
     * @param handler
     */
    @Override
    public synchronized void onSelfClosed(ClientHandler handler) 
        clientHandlerList.remove(handler);
    

    /**
     * 转发消息给其他客户端
     * @param handler
     * @param msg
     */
    @Override
    public void onNewMessageArrived(ClientHandler handler, String msg) 
        // 打印到屏幕
        System.out.println("Received-" + handler.getClientInfo() + ":" + msg);
        // 转发
        
    

这里加类锁来保证删除操作的线程安全。

关于添加操作的线程安全问题解决如下:

          try 
              // 客户端构建异步线程
              ClientHandler clientHandler = new ClientHandler(client,TCPServer.this);
              // 读取数据并打印
              clientHandler.readToPrint();
              // 添加同步处理
              synchronized (TCPServer.this) 
                  clientHandlerList.add(clientHandler);
              
           catch (IOException e) 
              e.printStackTrace();
              System.out.println("客户端连接异常:" + e.getMessage());
          

(5)异步转发

        // 转发
        clientHandlerCallback.onNewMessageArrived(ClientHandler.this,str);

 在ClientHandler.java中,上述代码所在的线程是主要线程,会一直有消息进来,所以不能做同步处理,那样会导致当前线程阻塞,从而导致后面进来的消息无法及时处理。

所以当 onNewMessageArrived()将消息抛出去之后,TCPServer.java的实现要采取异步转发的方式退给其他客户端。创建一个新的单例线程池来做转发的操作:

新增转发线程池:

    // 转发线程池
    private final ExecutorService forwardingThreadPoolExecutor;

    public TCPServer(int port) 
        this.port = port;
        this.forwardingThreadPoolExecutor = Executors.newSingleThreadExecutor();
    

转发投递消息给其他客户端:

    /**
     * 转发消息给其他客户端
     * @param handler
     * @param msg
     */
    @Override
    public void onNewMessageArrived(ClientHandler handler, String msg) 
        // 打印到屏幕
        System.out.println("Received-" + handler.getClientInfo() + ":" + msg);
        // 转发
        forwardingThreadPoolExecutor.execute(()->
            synchronized (TCPServer.this)
                for (ClientHandler clientHandler : clientHandlerList)
                    if(clientHandler.equals(handler))
                        // 跳过自己
                        continue;
                    
                    // 向其他客户端投递消息
                    clientHandler.send(msg);
                
            
        );
    

防止客户端下线后,依旧重复发送的问题:

ClientHandler.java - ClientWriteHandler
       /**
         * 发送到客户端
         * @param str
         */
        void send(String str) 
            // 如果已经发送完成,就返回
            if(done)
                return;
            
            executorService.execute(new WriteRunnable(str));
        

4. 聊天室Server/Client启动、测试

启动一个Server和三个Client:

如上图,Server启动后,每启动一个Client都会将Client信息加入到Server的list列表,每次由Server进行转发或发送消息给各个终端。

如上图,是Server和3个Client之间的模拟聊天。此时在Server中,当user1发送消息时,Server将消息本地读取之后,再转发给其他的两个非当前client对象。如果是由Server回复的,则当前client为空,会给所有的Client都发送消息。

综上,根据前面的思路、逻辑,实现了一个简单的聊天室消息转发的功能。

5. 服务器状态与测试用例构建

5.1 服务器状态-繁忙

  • 每个客户端都需要服务器进行双通等待
  • 双通:客户端发送数据到服务器的接收通道
  • 双通:服务器回送消息的发送通道
  • 每条通道因为堵塞只能使用异步线程实现

5.2 服务器线程数量

  • 一个客户端:双通 -> 2条线程
  • n个客户端:2n条线程
  • 服务器实际线程数量:2n+

5.3 客户端压测重构

(1)压测逻辑 main() 方法:

package net.qiujuer.lesson.sample.client;

import net.qiujuer.lesson.sample.client.bean.ServerInfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName ClientTest
 * @Description TODO
 * @Author wushaopei
 * @Date 2022/2/28 10:12
 * @Version 1.0
 */
public class ClientTest 
    private static boolean done = false;

    public static void main(String[] args) throws IOException 
        ServerInfo info = UDPSearcher.searchServer(10000);
        System.out.println("Server:" + info);
        if(info == null)
            return;
        

        // 当前连接数量
        int size = 0;
        List<TCPClient> tcpClients = new ArrayList<>();
        for (int i = 0; i < 1000; i++)
            try 
                TCPClient tcpClient = TCPClient.startWith(info);
                if (tcpClient == null)
                    System.out.println("连接异常");
                    continue;
                
                System.out.println("连接成功:" + (++size));
            catch (IOException e)
                System.out.println("连接异常");
            
            try 
                Thread.sleep(20);
            catch (InterruptedException e)
                e.printStackTrace();
            
        

        System.in.read();
        Runnable runnable = new Runnable() 
            @Override
            public void run() 
                while (!done)
                    for (TCPClient tcpClient : tcpClients)
                        tcpClient.send("Hello~");
                    
                    try 
                        Thread.sleep(1000);
                    catch (InterruptedException e)
                        e.printStackTrace();
                    
                
            
        ;
        Thread thread = new Thread(runnable);
        thread.start();

        System.in.read();
        // 等待线程完成
        done = true;
        try 
            thread.join();
        catch (InterruptedException e)
            e.printStackTrace();
        

        // 客户端结束操作
        for (TCPClient tcpClient : tcpClients)
            tcpClient.exit();
        
    

(2)重构原有的main()启动逻辑:

package net.qiujuer.lesson.sample.client;


import net.qiujuer.lesson.sample.client.bean.ServerInfo;

import java.io.*;

public class Client 
    public static void main(String[] args) 
        ServerInfo info = UDPSearcher.searchServer(10000);
        System.out.println("Server:" + info);

        if (info != null) 
            try 
                TCPClient tcpClient = TCPClient.startWith(info);
                if (tcpClient == null)
                    return;
                
                write(tcpClient);
             catch (IOException e) 
                e.printStackTrace();
            
        
    

    private static void write(TCPClient tcpClient) throws IOException 

        // 构建键盘输入流
        InputStream in = System.in;
        BufferedReader input = new BufferedReader(new InputStreamReader(in));

        do 
            // 键盘读取一行
            String str = input.readLine();
            // 发送到服务器
            tcpClient.send(str);

            if ("00bye00".equalsIgnoreCase(str)) 
                break;
            
         while (true);
    

Client的写操作在main() 调用write() 方法去send() 实现。

TCPClient.java 重构

    public static TCPClient startWith(ServerInfo info) throws IOException 
        Socket socket = new Socket();
        // 超时时间
        socket.setSoTimeout(3000);

        // 连接本地,端口2000;超时时间3000ms
        socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()), info.getPort()), 3000);

        System.out.println("已发起服务器连接,并进入后续流程~");
        System.out.println("客户端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort());
        System.out.println("服务器信息:" + socket.getInetAddress() + " P:" + socket.getPort());

        try 
            ReadHandler readHandler = new ReadHandler(socket.getInputStream());
            readHandler.start();
            return new TCPClient(socket,readHandler);

         catch (Exception e) 
            System.out.println("异常关闭");
            CloseUtils.close(socket);
        

        return null;
    

在压测 ClientTest.java 的main() 方法中,启动时创建1000个客户端去连接服务器端。

5.4 压测日志:

(1)分别启动server和client压测包

(2)Client创建1000个客户端链接服务器端:

 (3)启动阶段分析: 内存、CPU、线程

 如上图,压测环境下,CPU的占用最高不过10%左右,而堆内存的占用在 50MB~100MB之间浮动,而线程数则在1012个之间。多出的12个线程应该是JVM的守护线程或非业务线程。

(4)发送消息的阶段: 内存、CPU、线程

服务端发送消息到客户端,会再创建1000个线程。

此时CPU使用率在10%以内,堆内存占用为 60~100MB之间,线程数达到2011个。

(5)服务端关闭

性能数据如下:

此时,CPU占用率在断开的瞬间达到12%左右,活动的线程数释放后只剩下8个线程。

5.5 服务器性能数据分析

CPU:取决于数据的频繁性、数据的转发复杂性

内存:取决于客户端的数量、客户端发送的数据大小

5.6 服务器优化方案分析

首先,要减少线程数量。目前是有2N的线程,其中1N的线程用来处理链接,1N用来维护客户端的发送。可以将维护客户端的1N线程改为使用16个的线程池。这样 1N+16 的线程消耗要远远小于 2N的线程消耗。

其次,增加线程执行繁忙状态。目前的线程只是在客户端来数据时去拿数据处理,而大部分的时间都消耗在了等待上。而不是真实的去完成数据,而这2N的输出线程池的队列在大部分时间下是出于空闲状态。应该增加其繁忙状态,减少资源的浪费。

最后,使用客户端Buffer复用机制,优化内存的消耗。

以上是关于Socket网络编程学习笔记 (10)简易聊天室案例的主要内容,如果未能解决你的问题,请参考以下文章

Java学习网络编程全总结——TCPUdp多线程IO流Socket简易在线咨询聊天室Java爬虫

第94题JAVA高级技术-网络编程13(简易聊天室8:使用Socket传递图片)

第95题JAVA高级技术-网络编程14(简易聊天室9:使用Socket传递音频)

第92题JAVA高级技术-网络编程11(简易聊天室6:使用Socket通信)

第93题JAVA高级技术-网络编程12(简易聊天室7:使用Socket传递对象)

第93题JAVA高级技术-网络编程12(简易聊天室7:使用Socket传递对象)