在 *nix 中处理多路复用套接字的读取超时

Posted

技术标签:

【中文标题】在 *nix 中处理多路复用套接字的读取超时【英文标题】:Handling read timeout on multiplexed sockets in *nix 【发布时间】:2014-03-04 13:24:09 【问题描述】:

我正在尝试编写一个程序,该程序应该每 10 分钟一次运行轮询大约 3 到 400 台服务器。

必须发送一个 HTTP 请求并获得响应,解析它并存储在 DB 中。现在我有一个 C 源代码,它实际上可以完美地完成这项工作,只是总有一些服务器表现不佳并阻塞轮询器。我找到了一些有用的示例,展示了如何使用多路复用非阻塞套接字、选择和回调来做我需要的事情。

那么,它的作用是: 1。以非阻塞模式打开套接字:

recstate sckt_open(int *socketfd, const char *address, unsigned int *port)

  struct hostent *desthost;

  *socketfd = -1;

  desthost = gethostbyname(address); // get IP address of the destanation host by DNS name or IP
  if (!desthost || desthost->h_length != sizeof(struct in_addr))
  
    if (verbose >= 2)
      fprintf(stderr, "sckt_open(): cannot resolve %s: unknown host\n",
          address);

    return (FAILED);
  

  *socketfd = socket(AF_INET, SOCK_STREAM, 0); // create an AF_INET stream socket
  if (*socketfd < 0)
  
    if (verbose >= 2)
      fprintf(stderr, "sckt_open(): failed to create socket for %s:%d", address,
          *port);

    return (INTERROR);        // internal error occurred while processing record
  

  int rc;
  if ((rc = fcntl(*socketfd, F_GETFL)) < 0
      || fcntl(*socketfd, F_SETFL, rc | O_NONBLOCK) < 0)
  
    if (verbose >= 2)
      fprintf(stderr,
          "sckt_open(): failed to set nonblocking mode for socket %d for %s:%d",
          *socketfd, address, *port);

    return (INTERROR);        // internal error occurred while processing record
  
  rc = 1;
  if (setsockopt(*socketfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &rc, sizeof(rc))
      < 0)
  
    if (verbose >= 2)
      fprintf(stderr,
          "sckt_open(): failed set keepaliv mode for socket %d for %s:%d",
          *socketfd, address, *port);

    return (INTERROR);        // internal error occurred while processing record
  

  //FIXME
  //    struct timeval timeout;      
  //    timeout.tv_sec = 10;
  //    timeout.tv_usec = 0;
  //    setsockopt(*socketfd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&timeout, sizeof(struct timeval));

  struct sockaddr_in connaddr;
  memset(&connaddr, 0, sizeof(connaddr));
  connaddr.sin_family = AF_INET;                             // set proto family
  connaddr.sin_addr = *(struct in_addr *) desthost->h_addr; // set real destination address
  connaddr.sin_port = htons(*port);                      // set destination port

  if (connect(*socketfd, (struct sockaddr *) &connaddr, sizeof(connaddr)) < 0
      && errno != EINPROGRESS)
  
    if (verbose >= 2)
      fprintf(stderr, "sckt_open(): socket %d: failed to connect to %s:%d",
          *socketfd, address, *port);

    return (FAILED);
  

  if (verbose >= 2)
    printf("sckt_open(): created socket endpoint %d for %s:%d\n", *socketfd,
        address, *port);

  return (ACTIVE);

在 ACTIVE 的情况下,主循环将上传输出缓冲区,然后注册一个写回调。 write 回调函数将尝试实际将缓冲区写入套接字并测试结果: if write return smth.小于 0 但不是 EAGAIN 它将删除与此 fd 对应的所有回调并将服务器标记为 FAILED,以防 write 返回 smth。大于 0,但小于输出缓冲区的长度,它将在激活写入回调(部分写入)时返回。如果写入返回缓冲区长度:它现在将删除写入回调并注册读取回调并返回。

读取回调将通过使用 FIONREAD 为相应的套接字调用 ioctl 来计算操作系统输入缓冲区的大小,然后尝试从套接字读取该数量的字节到本地缓冲区。如果没有分配本地缓冲区并且读取返回0,则读取回调将关闭套接字,报告“0字节响应”并将服务器标记为FAILED,如果smth grater然后读取请求返回0,则读取回调将数据加载到本地缓冲区并返回同时保持读取回调处于活动状态。如果读取请求返回

还有一个重要的细节:我检查套接字是否准备就绪以及需要触发回调的方式:

void sckt_cb_check(void)

  fd_set tread_fds, twrite_fds;
  int counter, ready_fds;

  tread_fds = read_fds;
  twrite_fds = write_fds;

  ready_fds = select(FD_MAX, &tread_fds, &twrite_fds, NULL, NULL); // check for how many file descriptors are ready

  if (ready_fds < 0)
  
    fprintf(stderr, "sckt_cb_check(): select returned an error: %s\n",
        strerror(errno));

    return;
  

  for (counter = 0; ready_fds && counter < FD_MAX; counter++)
  
    if (FD_ISSET(counter, &tread_fds))
    
      ready_fds--;

      if (FD_ISSET(counter, &read_fds))
        read_callback[counter].callback_func(
            read_callback[counter].callback_arg);
    

    if (FD_ISSET(counter, &twrite_fds))
    
      ready_fds--;

      if (FD_ISSET(counter, &write_fds))
        write_callback[counter].callback_func(
            write_callback[counter].callback_arg);
    
  

这就是注册和删除回调的方式:

void sckt_cb_add(int socketfd, scktop operation, void (*func), void *arg)

  struct callback *curr_callback;

  if (socketfd < 0 || socketfd > FD_MAX)
  
    fprintf(stderr,
        "sckt_cb_add(): invalid file descriptor, failed to add new callback\n");

    return;
  

  curr_callback =
      &(operation == WRITE ? write_callback : read_callback)[socketfd];
  curr_callback->callback_func = func;
  curr_callback->callback_arg = arg;
  FD_SET(socketfd, operation == WRITE ? &write_fds : &read_fds);

  if (verbose >= 2)
    printf("sckt_cb_add(): registered %s callback for socket fd %d\n",
        operation == WRITE ? "write" : "read", socketfd);


void sckt_cb_free(int socketfd, scktop operation)

  if (socketfd <= 0 || socketfd > FD_MAX)
  
    fprintf(stderr,
        "sckt_cb_free(): invalid file descriptor, failed to free callback\n");

    return;
  

  FD_CLR(socketfd, operation == WRITE ? &write_fds : &read_fds);

  if (verbose >= 2)
    printf("sckt_cb_free(): removed %s callback for socket fd: %d\n",
        operation == WRITE ? "write" : "read", socketfd);

此外,还有一个 activeconnection 计数器,每次打开套接字时,我都会增加它,每次关闭套接字时,无论出于何种原因,我都会减少它。程序将循环直到该计数器为 0,这意味着无论如何都关闭了所有连接。所以,当这个机制尝试轮询这个虚拟服务器时,它永远不会关闭套接字,我测试了设置套接字超时和选择超时。他们都没有完成这项工作,套接字保持打开状态并且无法确定超时条件(没有读取也没有选择返回错误)。我明白有一个解决方案。最后,我可以将计时器添加到执行 sckt_cb_check() 的轮询循环中,并在一段时间后从该点删除所有这些服务器,但我认为这不是一个好主意。

所以,我发现有一种方法可以使用 pselect,设置超时并指示 EINTR errno,但我不知道如何将它们连接在一起。

UPD:实际上,如果找到虚拟服务器,它将永远留在 sckt_cd_check() 中。这使得我在主循环中使用计时器的解决方案毫无用处。

【问题讨论】:

【参考方案1】:

实际上,通过为时间戳和 select() 超时添加一个单独的字段来检查所有活动的轮询队列,使每个服务器超时工作。

【讨论】:

以上是关于在 *nix 中处理多路复用套接字的读取超时的主要内容,如果未能解决你的问题,请参考以下文章

IO多路复用

select-IO多路复用

Linux select多路复用介绍(转)

select-IO多路复用

#导入Word文档图片# Linux下IO多路复用: Selectpollepoll

非阻塞套接字与IO多路复用