如何在套接字关闭时唤醒 select()?

Posted

技术标签:

【中文标题】如何在套接字关闭时唤醒 select()?【英文标题】:How do I wake select() on a socket close? 【发布时间】:2009-08-25 16:37:11 【问题描述】:

我目前正在使用选择循环来管理代理中的套接字。此代理的要求之一是,如果代理向外部服务器发送消息并且在一定时间内没有得到响应,则代理应关闭该套接字并尝试连接到辅助服务器。关闭发生在一个单独的线程中,而选择线程阻塞等待活动。

我无法弄清楚如何检测此套接字是专门关闭的,以便我可以处理故障。如果我在另一个线程中调用 close(),我会得到一个 EBADF,但我不知道哪个套接字关闭了。我试图通过异常 fdset 检测套接字,认为它将包含关闭的套接字,但我没有得到任何返回。我也听说调用 shutdown() 会向服务器发送一个 FIN 并收到一个 FIN,这样我就可以关闭它;但重点是我试图关闭它,因为在超时期限内没有得到响应,所以我也不能这样做。

如果我在这里的假设是错误的,请告诉我。任何想法将不胜感激。

编辑: 针对使用选择超时的建议:我需要异步关闭,因为连接到代理的客户端将超时,我不能等待轮询选择。这只有在我将选择超时设置为非常小的情况下才有效,这将不断轮询并浪费我不想要的资源。

【问题讨论】:

另见***.com/questions/543541/… breaking out from socket select的可能重复 【参考方案1】:

通常我只是在另一个线程中标记要关闭的套接字,然后当 select() 从活动或超时返回时,我运行清理过程并关闭所有死连接并更新 fd_set。以任何其他方式执行此操作会使您面临放弃连接的竞争条件,就像 select() 最终识别出一些数据,然后关闭它,但另一个线程尝试处理检测到的数据并获取不高兴发现连接已关闭。

哦,poll() 通常比 select() 更好,因为不必复制太多数据。

【讨论】:

同意——在 select() 线程中执行 close()。如果您需要让另一个线程检测超时,请通过管道(您可以将其放入选择集中)向 select() 线程发送消息,而不是直接调用 close()。【参考方案2】:

当另一个线程正在或可能正在使用它时,您不能释放一个线程中的资源。在可能正在另一个线程中使用的套接字上调用close 永远不会正常工作。总会有潜在的灾难性竞争条件。

您的问题有两个很好的解决方案:

    让调用select 的线程始终使用不大于您愿意等待处理超时的最长时间。当发生超时时,指示调用select 的线程在从select 返回时会注意到的某个位置。让该线程在调用select 之间执行套接字的实际close

    在套接字上有检测超时调用shutdown 的线程。这将导致select 返回,然后让该线程执行close

【讨论】:

【参考方案3】:

如何在 select() 上处理 EBADF:

int fopts = 0;
for (int i = 0; i < num_clients; ++i) 
    if (fcntl(client[i].fd, F_GETFL, &fopts) < 0) 
        // call close(), FD_CLR(), and remove i'th element from client list
    

此代码假定您有一个客户端结构数组,其中包含套接字描述符的“fd”成员。 fcntl() 调用检查套接字是否仍然“活动”,如果不是,我们将执行我们必须删除死套接字及其相关客户端信息的操作。

【讨论】:

此方法包含竞争条件。可以关闭套接字并以不同的方式重用其 fd(例如服务器连接)。代码仍然可以像处理客户端连接一样处理它。【参考方案4】:

当只看到大象的一小部分时很难发表评论,但也许你把事情复杂化了?

大概你有一些结构来跟踪每个套接字及其信息(比如接收回复的剩余时间)。您可以更改 select() 循环以使用超时。在其中检查是否是时候关闭套接字了。做你需要为关闭做的事情,下次不要将它添加到 fd 集中。

【讨论】:

【参考方案5】:

如果您按照其他答案中的建议使用 poll(2),则可以使用 POLLNVAL 状态,它本质上是 EBADF,但基于每个文件描述符,而不是像 select( 2).

【讨论】:

【参考方案6】:

对选择使用超时,如果 read-ready/write-ready/had-error 序列都是空的(w.r.t 那个套接字),检查它是否被关闭。

【讨论】:

【参考方案7】:

只需在每个可能以零超时关闭的套接字上运行“测试选择”,并检查选择结果和 errno,直到找到已关闭的套接字。

以下演示代码在不同的线程上启动两个服务器套接字,并创建两个客户端套接字以连接到任一服务器套接字。然后它启动另一个线程,它将在 10 秒后随机终止一个客户端套接字(它只会关闭它)。关闭任一客户端套接字会导致选择失败并在主线程中出错,下面的代码现在将测试两个套接字中的哪一个实际上已关闭。

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <pthread.h>
#include <stdbool.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>


static void * serverThread ( void * threadArg )

    int res;
    int connSo;
    int servSo;
    socklen_t addrLen;
    struct sockaddr_in soAddr;
    uint16_t * port = threadArg;

    servSo = socket(PF_INET, SOCK_STREAM, 0);
    assert(servSo >= 0);

    memset(&soAddr, 0, sizeof(soAddr));
    soAddr.sin_family = AF_INET;
    soAddr.sin_port = htons(*port);

    // Uncommend line below if your system offers this field in the struct
    // and also needs this field to be initialized correctly.
//  soAddr.sin_len = sizeof(soAddr);

    res = bind(servSo, (struct sockaddr *)&soAddr, sizeof(soAddr));
    assert(res == 0);

    res = listen(servSo, 10);
    assert(res == 0);

    addrLen = 0;
    connSo = accept(servSo, NULL, &addrLen);
    assert(connSo >= 0);

    for (;;) 
        char buffer[2048];
        ssize_t bytesRead;

        bytesRead = recv(connSo, buffer, sizeof(buffer), 0);
        if (bytesRead <= 0) break;

        printf("Received %zu bytes on port %d.\n", bytesRead, (int)*port);
    
    free(port);
    close(connSo);
    close(servSo);
    return NULL;


static void * killSocketIn10Seconds ( void * threadArg )

    int * so = threadArg;

    sleep(10);
    printf("Killing socket %d.\n", *so);
    close(*so);
    free(so);
    return NULL;



int main ( int argc, const char * const * argv )

    int res;
    int clientSo1;
    int clientSo2;
    int * socketArg;
    uint16_t * portArg;
    pthread_t killThread;
    pthread_t serverThread1;
    pthread_t serverThread2;
    struct sockaddr_in soAddr;

    // Create a server socket at port 19500
    portArg = malloc(sizeof(*portArg));
    assert(portArg != NULL);
    *portArg = 19500;
    res = pthread_create(&serverThread1, NULL, &serverThread, portArg);
    assert(res == 0);

    // Create another server socket at port 19501
    portArg = malloc(sizeof(*portArg));
    assert(portArg != NULL);

    *portArg = 19501;
    res = pthread_create(&serverThread1, NULL, &serverThread, portArg);
    assert(res == 0);

    // Create two client sockets, one for 19500 and one for 19501
    // and connect both to the server sockets we created above.

    clientSo1 = socket(PF_INET, SOCK_STREAM, 0);
    assert(clientSo1 >= 0);

    clientSo2 = socket(PF_INET, SOCK_STREAM, 0);
    assert(clientSo2 >= 0);

    memset(&soAddr, 0, sizeof(soAddr));
    soAddr.sin_family = AF_INET;
    soAddr.sin_port = htons(19500);
    res = inet_pton(AF_INET, "127.0.0.1", &soAddr.sin_addr);
    assert(res == 1);

    // Uncommend line below if your system offers this field in the struct
    // and also needs this field to be initialized correctly.
//  soAddr.sin_len = sizeof(soAddr);

    res = connect(clientSo1, (struct sockaddr *)&soAddr, sizeof(soAddr));
    assert(res == 0);

    soAddr.sin_port = htons(19501);
    res = connect(clientSo2, (struct sockaddr *)&soAddr, sizeof(soAddr));
    assert(res == 0);

    // We want either client socket to be closed locally after 10 seconds.
    // Which one is random, so try running test app multiple times.
    socketArg = malloc(sizeof(*socketArg));
    srandomdev();
    *socketArg = (random() % 2 == 0 ? clientSo1 : clientSo2);
    res = pthread_create(&killThread, NULL, &killSocketIn10Seconds, socketArg);
    assert(res == 0);

    for (;;) 
        int ndfs;
        int count;
        fd_set readSet;

        // ndfs must be the highest socket number + 1
        ndfs = (clientSo2 > clientSo1 ? clientSo2 : clientSo1);
        ndfs++;

        FD_ZERO(&readSet);
        FD_SET(clientSo1, &readSet);
        FD_SET(clientSo2, &readSet);

        // No timeout, that means select may block forever here.
        count = select(ndfs, &readSet, NULL, NULL, NULL);

        // Without a timeout count should never be zero.
        // Zero is only returned if select ran into the timeout.
        assert(count != 0);

        if (count < 0) 
            int error = errno;

            printf("Select terminated with error: %s\n", strerror(error));

            if (error == EBADF) 
                fd_set closeSet;
                struct timeval atonce;

                FD_ZERO(&closeSet);
                FD_SET(clientSo1, &closeSet);
                memset(&atonce, 0, sizeof(atonce));
                count = select(clientSo1 + 1, &closeSet, NULL, NULL, &atonce);
                if (count == -1 && errno == EBADF) 
                    printf("Socket 1 (%d) closed.\n", clientSo1);
                    break; // Terminate test app
                

                FD_ZERO(&closeSet);
                FD_SET(clientSo2, &closeSet);
                // Note: Standard requires you to re-init timeout for every
                // select call, you must never rely that select has not changed
                // its value in any way, not even if its all zero.
                memset(&atonce, 0, sizeof(atonce));
                count = select(clientSo2 + 1, &closeSet, NULL, NULL, &atonce);
                if (count == -1 && errno == EBADF) 
                    printf("Socket 2 (%d) closed.\n", clientSo2);
                    break; // Terminate test app
                
            
        
    
    // Be a good citizen, close all sockets, join all threads
    close(clientSo1);
    close(clientSo2);
    pthread_join(killThread, NULL);
    pthread_join(serverThread1, NULL);
    pthread_join(serverThread2, NULL);

    return EXIT_SUCCESS;

运行此测试代码两次的示例输出:

$ ./sockclose 
Killing socket 3.
Select terminated with error: Bad file descriptor
Socket 1 (3) closed.

$  ./sockclose 
Killing socket 4.
Select terminated with error: Bad file descriptor
Socket 1 (4) closed.

但是,如果您的系统支持 poll(),我强烈建议您考虑使用此 API 而不是 select()。 Select 是过去相当丑陋的遗留 API,只是为了与现有代码向后兼容而留在那里。 Poll 为这个任务提供了一个更好的接口,它有一个额外的标志来直接告诉你一个套接字已经在本地关闭:如果这个套接字已经关闭,POLLNVAL 将被设置为revents,无论你在事件中请求了哪些标志,因为POLLNVAL 是仅输出标志,这意味着在events 上设置它时会被忽略。如果套接字没有在本地关闭,但远程服务器刚刚关闭了连接,POLLHUP 标志将在revents 中设置(也是仅输出标志)。 poll 的另一个优点是超时只是一个 int 值(毫秒,对于真正的网络套接字来说足够细),并且对可以监控的套接字数量或其数值范围没有限制。

【讨论】:

这行不通。无法保证套接字仍将关闭。例如,另一个线程可能在 close 之后但在您调用 select 之前调用 socket 并获得相同的描述符。 @DavidSchwartz “例如,另一个线程可能调用套接字......”,然后不要让另一个线程这样做。这是您的应用程序,您可以控制谁可以创建套接字、何时何地。 我写“例如”的原因是因为这只是它可能出错的一种方式。还有其他方法。例如,您一直在使用的新版本库现在可以创建一个调用 socket 的后台线程,当有人升级该库时,您的代码突然中断。

以上是关于如何在套接字关闭时唤醒 select()?的主要内容,如果未能解决你的问题,请参考以下文章

select() 后关闭套接字

在Android Studio中关闭手机上的应用程序时如何不关闭套接字

当我想关闭套接字时如何处理应用程序终止?

发送后如何正确关闭套接字(使用 IOCP)?

Ruby在recv时检测套接字关闭

如何等待任何套接字有数据?