为啥套接字被阻止接收,而我睡在另一个线程上?

Posted

技术标签:

【中文标题】为啥套接字被阻止接收,而我睡在另一个线程上?【英文标题】:Why is the socket being blocked from receiving, while I sleep on another thread?为什么套接字被阻止接收,而我睡在另一个线程上? 【发布时间】:2019-02-12 19:37:04 【问题描述】:

我有一个简单的套接字侦听器应用程序。它需要能够接收请求并给出响应,还需要自己发送请求并接收响应。

一旦我的应用程序启动,它将开始在单独的线程中接收并发送响应。这部分工作正常。

但是,当我通过SendRequest()-方法发送请求时,我需要过滤传入的响应,因此正确的响应会发送到之前发出的正确请求。我使用 ResponseHandler 类执行此操作(如下面的代码所示),它允许我注册一个请求,并在收到正确响应后立即通知我注册的请求。但是,放置的请求应该在 10 秒后超时,所以我使用了CountdownEvent,它会等待这 10 秒,但如果响应更早到来,则更早发布。

问题:我的CountdownEvent 总是等待整整 10 秒,只有在那之后,我接收消息的线程才会继续并因此接收响应。 当我在不同的线程上收到时,这怎么可能? 我认为,即使CountdownEvent.Wait() 处于活动状态,我的程序也会继续在那个单独的线程中接收。

注意:在我使用 NetworkTool WireShark 发出请求后,等待的响应确实会立即返回。所以超时是不正确的。

编辑:在一个简单的 WPF 应用程序中,从按钮调用 SendRequest(),它可以工作。不幸的是,这意味着我的大程序是问题所在。

服务:

public class Service

    private readonly ResponseHandler _responseHandler;
    private readonly SyncSocketServer _serverSocket;

    private static readonly int ServerPort = 9090;

    public Service()
    
        _responseHandler = new ResponseHandler();

        _serverSocket = new SyncSocketServer(ServerPort);
        _serverSocket.StartListening();
        _serverSocket.DataReceived += ServerSocket_DataReceived;
    

    public void ServerSocket_DataReceived(object sender, string message)
    
        // Here I left irrelevant code out: Originally, I check here,
        // whether the message is a request or response and so on, and 
        // I only forward the message to the _responseHandler, if it is
        // indeed a response. If it is a request I send an answer.

        string messageId = GetIdFromMessage(message);
        _responseHandler.DataReceived(messageId, message);
    

    public void SendRequest(string message)
    
        string messageId = Guid.NewGuid().ToString();
        string request = CreateRequest(messageId, message);

        _responseHandler.Register(messageId);
        _serverSocket.Send(request);
        string response = _responseHandler.WaitForResponse(messageId);

        Debug.WriteLine("I got the correct response: " + response);
    

SyncSocketServer:

public class SyncSocketServer

    public event EventHandler<string> DataReceived;

    private const int BufferSize = 1024;
    private const string EndDelimiter = "\n";

    private Socket _listenerSocket;
    private Socket _client;
    private string _data;
    private Byte[] _buffer;

    private readonly int _port;

    public SyncSocketServer(int port)
    
        _port = port;
        _buffer = new Byte[BufferSize];
    

    public void StartListening()
    
        IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
        IPAddress ipAddress = ipHostInfo.AddressList[3];
        IPEndPoint localEndPoint = new IPEndPoint(ipAddress, _port);

        _listenerSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

        _listenerSocket.Bind(localEndPoint);
        _listenerSocket.Listen(5);

        _client = _listenerSocket.Accept();
        Debug.WriteLine("Local socket opened on: 0", _listenerSocket.LocalEndPoint);

        StartReceiving();
    

    private void StartReceiving()
    
        Thread d = new Thread(() => 
            Thread.CurrentThread.IsBackground = true;
            while (true)
            
                _data = null;

                while (true)
                
                    int bytesReceived = _client.Receive(_buffer);
                    _data += Encoding.ASCII.GetString(_buffer, 0, bytesReceived);

                    if (_data.IndexOf(EndDelimiter, StringComparison.OrdinalIgnoreCase) > -1)
                        break;
                

                Debug.WriteLine("Message received:" + _data);
                OnDataReceived(_data);
            
        );
        d.Start();
    

    public void Send(string message)
    
        byte[] bytesMessage = Encoding.ASCII.GetBytes(message + EndDelimiter);
        _client.Send(bytesMessage);
        Debug.WriteLine("Message sent: " + message);
    

    protected virtual void OnDataReceived(string data)
    
        EventHandler<string> handler = DataReceived;

        if (handler != null)
            handler(this, data);
    

ResponseHandler:

public class ResponseHandler

    private const int WaitForResponseTimeout = 10000;

    private readonly Dictionary<string, PendingRequest> _pendingRequests;

    public ResponseHandler()
    
        _pendingRequests = new Dictionary<string, PendingRequest>();
    

    public void DataReceived(string messageId, string response)
    
        _pendingRequests.TryGetValue(messageId, out var pendingRequest);

        if (pendingRequest == null)
            Debug.WriteLine("Received response for request, that has been removed");
        else
        
            pendingRequest.ResponseReceived(response);
            _pendingRequests.Remove(messageId);
        
    

    public void Register(string messageId)
    
        _pendingRequests.Add(messageId, new PendingRequest());
    

    public string WaitForResponse(string messageId)
    
        _pendingRequests.TryGetValue(messageId, out var pendingRequest);

        if (pendingRequest == null)
            return null;

        pendingRequest.Await();
        return pendingRequest.Response;
    

    private class PendingRequest
    
        public string Response  get; private set; 

        private readonly CountdownEvent _countdownEvent;

        public PendingRequest()
        
            _countdownEvent = new CountdownEvent(1);
        

        public void Await()
        
            // Here, the current thread gets blocked, but
            // I expect, that the thread, where I receive
            // would continue receiving
            _countdownEvent.Wait(WaitForResponseTimeout);
        

        public void ResponseReceived(stringresponse)
        
            Response = response;
            _countdownEvent.Signal();
        
    

【问题讨论】:

用日志输出粘贴所有关键点。检查当...时会发生什么...即使您在发出 CountDownEvent 信号后开始等待,它也应该立即返回...非常奇怪。 @Fildor,这也是我发现的。我从昨天开始调试,也有日志,但我不明白,一定错过了这么简单的东西。我目前尝试构建最简单的程序,出现此问题。 minimal reproducible example 会很棒,是的! 【参考方案1】:

因此,您的 PendingRequestResponseHandler 类正在从不同的线程访问。因此,为了程序的健全性,您需要做几件事:

a) 确保当您从待处理的请求字典中添加和删除请求时,您会获得锁,因为您同时从不同的线程访问共享数据结构。否则你可能会破坏你的数据结构。

b) 您更直接的问题是PendingRequest 中的Await() 方法。您正在调用 CountdownEvent.Wait() 而不验证您的响应是否已设置。如果您的响应已设置,则意味着您将等待 10 秒再处理它。如果您的响应到达,甚至在您调用CountdownEvent.Wait() 之前,就会发生这种情况。在这种情况下,CountdownEvent.Signal() 将被忽略。您应该将PendingRequest.Wait() 更改如下:

while (Response is not set) 
      CountdownEvent.Await();

另外,您的CountdownEvent.Wait() 信号量不需要传递给它的互斥量吗?请记住,您的 Response 对象正在线程之间共享。这是使用 wait() 方法的一般范例:

mutex.lock();
while (Response is not set) 
          CountdownEvent.Await(mutex);
    

// Do your stuff, since your condition is satisfied
mutext.unlock();

【讨论】:

a) 我必须研究一下这个提议。 b) 情况并非如此。照原样,如果在Signal() 之后调用Wait(),导致倒计时已经为0,则Wait() 将立即返回而不等待超时,这是预期的行为。 但是当我在等待时使用互斥锁进行阻塞时,这不会完全导致我想要的结果,即我无法更新 PendingRequest,在等待 10 秒的时候? 条件变量在调用线程暂停执行前放弃wait()中的锁,并在被超时或调用signal()方法唤醒时重新获取锁,然后从wait( ) 方法。【参考方案2】:

问题实际上是错误的假设,即触发事件,就像我在下面所做的那样,会导致火灾并忘记:

protected virtual void OnDataReceived(string data)

    EventHandler<string> handler = DataReceived;

    if (handler != null)
        handler(this, data);

在函数 StartReceiving() 中,我接收数据并将其转发给订阅者,它会在调用时暂停,触发事件并等待所有订阅者完成他们的工作(当然包括等待 10秒的响应)。这导致我的接收线程等待另一个线程。


解决方案是,实现调用,所以它会做火而忘记:

protected virtual void OnDataReceived(string data)

    EventHandler<string> handler = DataReceived;

    if (handler != null)
        handler.BeginInvoke(this, data, null, null);

【讨论】:

以上是关于为啥套接字被阻止接收,而我睡在另一个线程上?的主要内容,如果未能解决你的问题,请参考以下文章

在另一个线程上接收的套接字上触发 EAGAIN

Java 套接字:您可以从一个线程发送并在另一个线程上接收吗?

套接字发送的文件大小为 0 并且访问被阻止,除非我使用 System.exit(0),如何避免这种情况?

C++ 套接字 Send() 线程安全

使用http和原始套接字并通过Play Protect阻止

从不同线程在同一个套接字上发送和接收不起作用