C#多线程聊天服务器,处理断开连接

Posted

技术标签:

【中文标题】C#多线程聊天服务器,处理断开连接【英文标题】:C# multithreading chat server, handle disconnect 【发布时间】:2017-12-09 00:50:02 【问题描述】:

我正在寻找一种处理断开连接的方法,因为每次我关闭客户端时,服务器都会停止工作。我收到一条错误消息,在此行中“无法读取超出流的末尾”:

string message = reader.ReadString();

我还需要一种从客户端列表中删除断开连接的客户端的方法。 这是我的代码: 服务器

using System;
using System.Threading;
using System.Net.Sockets;
using System.IO;
using System.Net;
using System.Collections.Generic;

namespace Server

    class Server
    
    public static List<TcpClient> clients = new List<TcpClient>();

    static void Main(string[] args)
    
        IPAddress ip = IPAddress.Parse("127.0.0.1");
        TcpListener ServerSocket = new TcpListener(ip, 14000);
        ServerSocket.Start();

        Console.WriteLine("Server started.");
        while (true)
        
            TcpClient clientSocket = ServerSocket.AcceptTcpClient();
            clients.Add(clientSocket);
            handleClient client = new handleClient();
            client.startClient(clientSocket);
        
    


public class handleClient

    TcpClient clientSocket;
    public void startClient(TcpClient inClientSocket)
    
        this.clientSocket = inClientSocket;
        Thread ctThread = new Thread(Chat);
        ctThread.Start();
    

    private void Chat()
    
        while (true)
        
            BinaryReader reader = new BinaryReader(clientSocket.GetStream());
            while (true)
            
                string message = reader.ReadString();
                foreach (var client in Server.clients)
                
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(message);
                
            
        
    


客户

using System;
using System.Net.Sockets;
using System.IO;
using System.Threading;

namespace Client

   class Client
   
       public static void Write()
       
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        while (true)
        
            string str = Console.ReadLine();
            BinaryWriter writer = new BinaryWriter(client.GetStream());
            writer.Write(str);
        
    

    public static void Read()
    
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        while (true)
        
            BinaryReader reader = new BinaryReader(client.GetStream());
            Console.WriteLine(reader.ReadString());
        
    

    static void Main(string[] args)
    
        Thread Thread = new Thread(Write);
        Thread Thread2 = new Thread(Read);
        Thread.Start();
        Thread2.Start();
    


【问题讨论】:

不是您要的问题(Mike Nakis 回答了这个问题),但从多个线程访问 Server.clients 并不安全。 你应该阅读一些文档和教程来展示处理网络代码的正确方法。您的代码示例有许多缺陷,其中最重要的是没有优雅的关闭。即使有,使用BinaryReader 也能保证出现异常,因为这就是它报告流结束的方式。由于您只是使用字符串,因此您应该使用 StreamWriterStreamReader 代替。但是您也应该从要关闭连接的端点使用client.Client.Shutdown(),并遵循优雅的关闭。那么正常情况下不需要异常。 【参考方案1】:

每次我关闭客户端时,服务器都会停止工作。我收到一条错误消息,它“无法读取超出流的末尾”

从某种意义上说,这是完全正常的。也就是说,当使用BinaryReader时,它的正常行为是在到达流尾时抛出EndOfStreamException

为什么会到达流尾?好吧,因为客户端断开了连接,这就是流发生的情况。在套接字级别,真正发生的是读取操作以 0 作为读取的字节数完成。这表明客户端已经优雅地关闭了套接字并且将不再发送任何数据。

在 .NET API 中,这被转换为 TcpClient 用于包装实际处理网络 I/O 的 Socket 对象的 NetworkStream 的末尾。而这个NetworkStream 对象又被你的BinaryReader 对象包裹。而BinaryReader 在到达流的末尾时会抛出该异常。

请注意,您的代码实际上并没有为用户提供关闭客户端的优雅方式。他们将不得不使用 Ctrl+C,或者直接终止进程。使用前者具有执行优雅关闭套接字的偶然效果,但这只是因为 .NET 正在处理进程的终止并在您的对象上运行终结器,例如用于连接到服务器的 TcpClient 对象,以及终结器调用Socket.Shutdown() 告诉服务器它正在关闭。

如果您要终止进程(例如,使用任务管理器),您会发现一个 IOException 被抛出。好的网络代码应该随时准备看到IOException;网络不可靠,并且确实会发生故障。你想做一些合理的事情,比如从你的连接中移除远程端点,而不是让整个程序崩溃。

现在,说了这么多,仅仅因为EndOfStreamException 是“正常的”,这并不意味着您发布的代码是,或者以任何方式是进行网络编程的正确方法的示例。你有很多问题:

    没有明确的优雅闭包。 网络 I/O 提供了一种关闭连接的常规方法,其中包括在两个端点上进行握手以指示它们何时完成发送以及何时完成接收。一个端点将指示它已完成发送;另一个会注意到这一点(使用上面提到的 0 字节读取),然后它本身表明它已完成发送和接收。TcpClientNetworkStream 不要直接公开这个,但你可以使用TcpClient.Client 属性获取Socket 对象以进行更好的优雅关闭,即一个端点可以指示它已完成发送,并且仍然能够等到另一个端点也完成发送。 使用TcpClient.Close() 方法断开连接就像挂断某人的电话而不说“再见”。使用Socket.Shutdown() 就像以礼貌的方式结束一个电话“好吧,这就是我想说的一切......还有什么要说的吗?” 您正在使用BinaryReader,但未正确处理EndOfStreamException。 您的客户端使用两个连接与服务器通信。 网络 I/O 使用支持全双工通信的Socket 对象。无需为了读写而创建第二个连接。一个连接就足够了,而且更好,因为当您将发送和接收分成两个连接时,您还需要向您的协议添加一些内容,以便服务器知道这两个连接代表一个客户端(您的代码会这样做 实际上没有)。 客户端在断开连接时不会从服务器列表中删除(您在问题中提到了这一点)。 客户端列表不是线程安全的。 您的 Chat() 方法中有一个额外的“while (true)”。

我已经修改了您的原始示例以解决上述所有问题,我在这里介绍了这些内容:

服务器程序.cs:

class Program

    private static readonly object _lock = new object();
    private static readonly List<TcpClient> clients = new List<TcpClient>();

    public static TcpClient[] GetClients()
    
        lock (_lock) return clients.ToArray();
    

    public static int GetClientCount()
    
        lock (_lock) return clients.Count;
    

    public static void RemoveClient(TcpClient client)
    
        lock (_lock) clients.Remove(client);
    

    static void Main(string[] args)
    
        IPAddress ip = IPAddress.Parse("127.0.0.1");
        TcpListener ServerSocket = new TcpListener(ip, 14000);
        ServerSocket.Start();

        Console.WriteLine("Server started.");
        while (true)
        
            TcpClient clientSocket = ServerSocket.AcceptTcpClient();
            Console.WriteLine($"client connected: clientSocket.Client.RemoteEndPoint");
            lock (_lock) clients.Add(clientSocket);
            handleClient client = new handleClient();
            client.startClient(clientSocket);

            Console.WriteLine($"GetClientCount() clients connected");
        
    

服务器handleClient.cs:

public class handleClient

    TcpClient clientSocket;

    public void startClient(TcpClient inClientSocket)
    
        this.clientSocket = inClientSocket;
        Thread ctThread = new Thread(Chat);
        ctThread.Start();
    

    private void Chat()
    
        BinaryReader reader = new BinaryReader(clientSocket.GetStream());

        try
        
            while (true)
            
                string message = reader.ReadString();
                foreach (var client in Program.GetClients())
                
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(message);
                
            
        
        catch (EndOfStreamException)
        
            Console.WriteLine($"client disconnecting: clientSocket.Client.RemoteEndPoint");
            clientSocket.Client.Shutdown(SocketShutdown.Both);
        
        catch (IOException e)
        
            Console.WriteLine($"IOException reading from clientSocket.Client.RemoteEndPoint: e.Message");
        

        clientSocket.Close();
        Program.RemoveClient(clientSocket);
        Console.WriteLine($"Program.GetClientCount() clients connected");
    

Client Program.cs:

class Program

    private static readonly object _lock = new object();
    private static bool _closed;

    public static void Write(TcpClient client)
    
        try
        
            string str;
            SocketShutdown reason = SocketShutdown.Send;

            while ((str = Console.ReadLine()) != "")
            
                lock (_lock)
                
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(str);

                    if (_closed)
                    
                        // Remote endpoint already said they are done sending,
                        // so we're done with both sending and receiving.
                        reason = SocketShutdown.Both;
                        break;
                    
                
            

            client.Client.Shutdown(reason);
        
        catch (IOException e)
        
            Console.WriteLine($"IOException writing to socket: e.Message");
        
    

    public static void Read(TcpClient client)
    
        try
        
            while (true)
            
                try
                
                    BinaryReader reader = new BinaryReader(client.GetStream());
                    Console.WriteLine(reader.ReadString());
                
                catch (EndOfStreamException)
                
                    lock (_lock)
                    
                        _closed = true;
                        return;
                    
                
            
        
        catch (IOException e)
        
            Console.WriteLine($"IOException reading from socket: e.Message");
        
    

    static void Main(string[] args)
    
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        Thread writeThread = new Thread(() => Write(client));
        Thread readThread = new Thread(() => Read(client));
        writeThread.Start();
        readThread.Start();

        writeThread.Join();
        readThread.Join();

        client.Close();
        Console.WriteLine("client exiting");
    

请注意,在大多数情况下,我并没有解决您在代码中使用的不一致和非常规的命名问题。唯一的例外是客户端代码中的线程变量,因为我真的不喜欢与类型名称完全匹配的大写局部变量。

您还有一些其他问题,您的代码的上述修订版没有解决这些问题。其中包括:

    您正在使用BinaryReader。这在很多方面都是一个令人讨厌的类。我建议您切换到使用StreamReader/StreamWriter,尤其是对于您只处理文本的聊天服务器场景。 关注点的耦合/分离不正确。您的 Program 类具有服务器代码,并且服务器代码知道 Program 类。最好将服务器和客户端实现封装到它们自己的类中,与程序的主入口点分开,并进一步将***服务器代码与每个客户端的数据结构解耦(使用 C# 的 @ 987654353@ 允许***服务器代码收到重要事件的通知,例如需要从列表中删除客户端,而无需每个客户端的数据结构必须实际了解***服务器对象,从不注意它的客户名单)。 您应该提供一种机制来正常关闭服务器。

通常,我会说这些超出了这样的答案的范围,这已经很长了。我已经解决了您的代码中的直接问题,然后是一些,这在名义上就足够了。

但是,我一直想写一个我几年前写的basic network programming 示例的更新版本,作为一种“中间”示例,添加多个客户端支持、异步操作,并使用最新的C# 功能(如async/await)。所以,我继续前进并花了一些时间来做到这一点。我想我最终会把它发布到我的博客上……那是另一个项目。同时,这是代码(请注意,这完全是一个从头开始的示例……这样做比尝试重新编写您拥有的代码更有意义)……

此实现的大部分繁重工作都在服务器和客户端共享的单个类中:

/// <summary>
/// Represents a remote end-point for the chat server and clients
/// </summary>
public sealed class ConnectedEndPoint : IDisposable

    private readonly object _lock = new object();
    private readonly Socket _socket;
    private readonly StreamReader _reader;
    private readonly StreamWriter _writer;
    private bool _closing;

    /// <summary>
    /// Gets the address of the connected remote end-point
    /// </summary>
    public IPEndPoint RemoteEndPoint  get  return (IPEndPoint)_socket.RemoteEndPoint;  

    /// <summary>
    /// Gets a <see cref="Task"/> representing the on-going read operation of the connection
    /// </summary>
    public Task ReadTask  get; 

    /// <summary>
    /// Connect to an existing remote end-point (server) and return the
    /// <see cref="ConnectedEndPoint"/> object representing the new connection
    /// </summary>
    /// <param name="remoteEndPoint">The address of the remote end-point to connect to</param>
    /// <param name="readCallback">The callback which will be called when a line of text is read from the newly-created connection</param>
    /// <returns></returns>
    public static ConnectedEndPoint Connect(IPEndPoint remoteEndPoint, Action<ConnectedEndPoint, string> readCallback)
    
        Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);

        socket.Connect(remoteEndPoint);

        return new ConnectedEndPoint(socket, readCallback);
    

    /// <summary>
    /// Asynchronously accept a new connection from a remote end-point
    /// </summary>
    /// <param name="listener">The listening <see cref="Socket"/> which will accept the connection</param>
    /// <param name="readCallback">The callback which will be called when a line of text is read from the newly-created connection</param>
    /// <returns></returns>
    public static async Task<ConnectedEndPoint> AcceptAsync(Socket listener, Action<ConnectedEndPoint, string> readCallback)
    
        Socket clientSocket = await Task.Factory.FromAsync(listener.BeginAccept, listener.EndAccept, null);

        return new ConnectedEndPoint(clientSocket, readCallback);
    

    /// <summary>
    /// Write a line of text to the connection, sending it to the remote end-point
    /// </summary>
    /// <param name="text">The line of text to write</param>
    public void WriteLine(string text)
    
        lock (_lock)
        
            if (!_closing)
            
                _writer.WriteLine(text);
                _writer.Flush();
            
        
    

    /// <summary>
    /// Initiates a graceful closure of the connection
    /// </summary>
    public void Shutdown()
    
        _Shutdown(SocketShutdown.Send);
    

    /// <summary>
    /// Implements <see cref="IDisposable.Dispose"/>
    /// </summary>
    public void Dispose()
    
        _reader.Dispose();
        _writer.Dispose();
        _socket.Close();
    

    /// <summary>
    /// Constructor. Private -- use one of the factory methods to create new connections.
    /// </summary>
    /// <param name="socket">The <see cref="Socket"/> for the new connection</param>
    /// <param name="readCallback">The callback for reading lines on the new connection</param>
    private ConnectedEndPoint(Socket socket, Action<ConnectedEndPoint, string> readCallback)
    
        _socket = socket;
        Stream stream = new NetworkStream(_socket);
        _reader = new StreamReader(stream, Encoding.UTF8, false, 1024, true);
        _writer = new StreamWriter(stream, Encoding.UTF8, 1024, true);

        ReadTask = _ConsumeSocketAsync(readCallback);
    

    private void _Shutdown(SocketShutdown reason)
    
        lock (_lock)
        
            if (!_closing)
            
                _socket.Shutdown(reason);
                _closing = true;
            
        
    

    private async Task _ConsumeSocketAsync(Action<ConnectedEndPoint, string> callback)
    
        string line;

        while ((line = await _reader.ReadLineAsync()) != null)
        
            callback(this, line);
        

        _Shutdown(SocketShutdown.Both);
    

客户端程序将直接使用该类。服务端封装在另外一个类中,与上面同一个DLL中找到:

/// <summary>
/// Event arguments for the <see cref="ChatServer.Status"/> event
/// </summary>
public class StatusEventArgs : EventArgs

    /// <summary>
    /// Gets the status text
    /// </summary>
    public string StatusText  get; 

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="statusText">The status text</param>
    public StatusEventArgs(string statusText)
    
        StatusText = statusText;
    


/// <summary>
/// A server implementing a simple line-based chat server
/// </summary>
public class ChatServer

    private readonly object _lock = new object();
    private readonly Socket _listener;
    private readonly List<ConnectedEndPoint> _clients = new List<ConnectedEndPoint>();
    private bool _closing;

    /// <summary>
    /// Gets a task representing the listening state of the servdere
    /// </summary>
    public Task ListenTask  get; 

    /// <summary>
    /// Raised when the server has status to report
    /// </summary>
    public event EventHandler<StatusEventArgs> Status;

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="port">The port number the server should listen on</param>
    public ChatServer(int port)
    
        _listener = new Socket(SocketType.Stream, ProtocolType.Tcp);
        _listener.Bind(new IPEndPoint(IPAddress.Any, port));
        _listener.Listen(int.MaxValue);
        ListenTask = _ListenAsync();
    

    /// <summary>
    /// Initiates a shutdown of the chat server.
    /// </summary>
    /// <remarks>This method closes the listening socket, which will subsequently
    /// cause the listening task to inform any connected clients that the server
    /// is shutting down, and to wait for the connected clients to finish a graceful
    /// closure of their connections.
    /// </remarks>
    public void Shutdown()
    
        _listener.Close();
    

    private async Task _ListenAsync()
    
        try
        
            while (true)
            
                ConnectedEndPoint client = await ConnectedEndPoint.AcceptAsync(_listener, _ClientReadLine);

                _AddClient(client);
                _CleanupClientAsync(client);
            
        
        catch (ObjectDisposedException)
        
            _OnStatus("Server's listening socket closed");
        
        catch (IOException e)
        
            _OnStatus($"Listening socket IOException: e.Message");
        

        await _CleanupServerAsync();
    

    private async Task _CleanupServerAsync()
    
        ConnectedEndPoint[] clients;

        lock (_lock)
        
            _closing = true;
            clients = _clients.ToArray();
        

        foreach (ConnectedEndPoint client in clients)
        
            try
            
                client.WriteLine("Chat server is shutting down");
            
            catch (IOException e)
            
                _OnClientException(client, e.Message);
            
            client.Shutdown();
        

        // Clients are expected to participate in graceful closure. If they do,
        // this will complete when all clients have acknowledged the shutdown.
        // In a real-world program, may be a good idea to include a timeout in
        // case of network issues or misbehaving/crashed clients. Implementing
        // the timeout is beyond the scope of this proof-of-concept demo code.
        try
        
            await Task.WhenAll(clients.Select(c => c.ReadTask));
        
        catch (AggregateException)
        
            // Actual exception for each client will have already
            // been reported by _CleanupClientAsync()
        
    

    // Top-level "clean-up" method, which will observe and report all exceptions
    // In real-world code, would probably want to simply log any unexpected exceptions
    // to a log file and then exit the process. Here, we just exit after reporting
    // exception info to caller. In either case, there's no need to observe a Task from
    // this method, and async void simplifies the call (no need to receive and then ignore
    // the Task object just to keep the compiler quiet).
    private async void _CleanupClientAsync(ConnectedEndPoint client)
    
        try
        
            await client.ReadTask;
        
        catch (IOException e)
        
            _OnClientException(client, e.Message);
        
        catch (Exception e)
        
            // Unexpected exceptions are programmer-error. They could be anything, and leave
            // the program in an unknown, possibly corrupt state. The only reasonable disposition
            // is to log, then exit.
            //
            // Full stack-trace, because who knows what this exception was. Will need the
            // stack-trace to do any diagnostic work.
            _OnStatus($"Unexpected client connection exception. e");
            Environment.Exit(1);
        
        finally
        
            _RemoveClient(client);
            client.Dispose();
        
    

    private void _ClientReadLine(ConnectedEndPoint readClient, string text)
    
        _OnStatus($"Client readClient.RemoteEndPoint: \"text\"");

        lock (_lock)
        
            if (_closing)
            
                return;
            

            text = $"readClient.RemoteEndPoint: text";

            foreach (ConnectedEndPoint client in _clients.Where(c => c != readClient))
            
                try
                
                    client.WriteLine(text);
                
                catch (IOException e)
                
                    _OnClientException(client, e.Message);
                
            
        
    

    private void _AddClient(ConnectedEndPoint client)
    
        lock (_lock)
        
            _clients.Add(client);
            _OnStatus($"added client client.RemoteEndPoint -- _clients.Count clients connected");
        
    

    private void _RemoveClient(ConnectedEndPoint client)
    
        lock (_lock)
        
            _clients.Remove(client);
            _OnStatus($"removed client client.RemoteEndPoint -- _clients.Count clients connected");
        
    

    private void _OnStatus(string statusText)
    
        Status?.Invoke(this, new StatusEventArgs(statusText));
    

    private void _OnClientException(ConnectedEndPoint client, string message)
    
        _OnStatus($"Client client.RemoteEndPoint IOException: message");
    

也就是说,在大多数情况下,您所需要的一切。上面的 DLL 代码(在我的示例中)被两个不同的程序引用,一个服务器和一个客户端。

这是服务器:

class Program

    private const int _kportNumber = 5678;

    static void Main(string[] args)
    
        ChatServer server = new ChatServer(_kportNumber);

        server.Status += (s, e) => WriteLine(e.StatusText);

        Task serverTask = _WaitForServer(server);

        WriteLine("Press return to shutdown server...");
        ReadLine();

        server.Shutdown();
        serverTask.Wait();
    

    private static async Task _WaitForServer(ChatServer server)
    
        try
        
            await server.ListenTask;
        
        catch (Exception e)
        
            WriteLine($"Server exception: e");
        
    

这是客户:

class Program

    private const int _kportNumber = 5678;

    static void Main(string[] args)
    
        IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Loopback, _kportNumber);
        ConnectedEndPoint server = ConnectedEndPoint.Connect(remoteEndPoint, (c, s) => WriteLine(s));

        _StartUserInput(server);
        _SafeWaitOnServerRead(server).Wait();
    

    private static void _StartUserInput(ConnectedEndPoint server)
    
        // Get user input in a new thread, so main thread can handle waiting
        // on connection.
        new Thread(() =>
        
            try
            
                string line;

                while ((line = ReadLine()) != "")
                
                    server.WriteLine(line);
                

                server.Shutdown();
            
            catch (IOException e)
            
                WriteLine($"Server server.RemoteEndPoint IOException: e.Message");
            
            catch (Exception e)
            
                WriteLine($"Unexpected server exception: e");
                Environment.Exit(1);
            
        )
        
            // Setting IsBackground means this thread won't keep the
            // process alive. So, if the connection is closed by the server,
            // the main thread can exit and the process as a whole will still
            // be able to exit.
            IsBackground = true
        .Start();
    

    private static async Task _SafeWaitOnServerRead(ConnectedEndPoint server)
    
        try
        
            await server.ReadTask;
        
        catch (IOException e)
        
            WriteLine($"Server server.RemoteEndPoint IOException: e.Message");
        
        catch (Exception e)
        
            // Should never happen. It's a bug in this code if it does.
            WriteLine($"Unexpected server exception: e");
        
    

在我看来,上面需要注意的最重要的事情之一是ConnectedEndPointChatServer 类对使用它们的类有 依赖。通过使用回调委托和事件,依赖于这些类的代码能够双向交互,而这些支持类不必知道代码所在的类型(参见“控制反转”,这是一个变体的)。

你越能让你的代码关系看起来像一棵只有单向引用的树,就越容易编写代码并在以后维护它。

注意:为了便于说明,我同时使用了事件和回调委托。任何一种方法都可以单独使用。主要的权衡是复杂性与灵活性。使用事件使代码更加灵活——可以根据需要添加和删除事件处理程序——但是如果使用带有senderEventArgs 参数的方法签名的.NET 约定来实现事件,则它有点“重” -weight”,而不仅仅是在创建相关对象时传递一个简单的回调委托。我在代码中分别放了一个示例,您可以决定在哪种情况下更喜欢哪种方法。

您还会注意到,以上内容大量使用了 C# 的异步特性。起初,这可能会使代码看起来更难阅读。但事实上,使用这些功能让一切正常工作实际上要容易得多,而不是我尝试使用旧的 BeginXXX()/EndXXX() 方法,或者,天堂禁止,为每个连接专用一个线程(缩放 非常很差)。绝对值得习惯以这种方式考虑本质上是异步的操作,例如网络 I/O。

【讨论】:

我正在寻找如此详细的答案!如果您有时间,如果您可以重新编写代码以涵盖最后三点,那就太好了。我对解决方案感到好奇。 @LukasDieser:我通常不会在答案中添加那么多代码,但这是我一直在考虑编写的代码示例,所以我继续做了。请参阅上面的编辑。

以上是关于C#多线程聊天服务器,处理断开连接的主要内容,如果未能解决你的问题,请参考以下文章

如何正确暂停/停止线程?

C# 客户端 - 服务器套接字断开处理

对等体断开连接后未释放SSL内存

c# TCP/IP 自动断开

C# IRC 和 Twitch 空闲断开连接?

连接后立即断开Websocket