《嵌入操作系统 – 玩转ART-Pi开发板》第9章 基于Select/Poll实现并发服务器

Posted Bruceoxl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《嵌入操作系统 – 玩转ART-Pi开发板》第9章 基于Select/Poll实现并发服务器相关的知识,希望对你有一定的参考价值。

基于Select/Poll实现并发服务器(一)

9.3 Select/Poll概述

在LWIP中,如果要实现并发服务器,可以基于Sequentaial API来实现,这种方式需要使用多线程,也就是为每个连接创建一个线程来处理数据。而在资源受限的嵌入式设备来说,如果为每个连接都创建一个线程,这种资源的消耗是巨大的,因此,我们需要换一种实现思路,也就是使用IO多路复用的机制来实现,也就是select机制

Select/Poll则是POSIX所规定,一般操作系统或协议栈均有实现。

值得注意的是,poll和select都是基于内核函数sys_poll实现的,不同在于在Linux系统中select是从BSD Unix系统继承而来,poll则是从System V Unix系统继承而来,因此两种方式相差不大。poll函数没有最大文件描述符数量的限制。poll和 select与一样,大量文件描述符的数组被整体复制于用户和内核的地址空间之间,开销随着文件描述符数量的增加而线性增大。

9.3.1 Select函数

在BSD Socket 中,select函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);

【参数说明】

  • nfds:select监视的文件句柄数,一般设为要监视各文件中的最大文件描述符值加1。
  • readfds:文件描述符集合监视文件集中的任何文件是否有数据可读,当select函数返回的时候,readfds将清除其中不可读的文件描述符,只留下可读的文件描述符。
  • writefds:文件描述符集合监视文件集中的任何文件是否有数据可写,当select函数返回的时候,writefds将清除其中不可写的文件描述符,只留下可写的文件描述符。
  • exceptfds:文件集将监视文件集中的任何文件是否发生错误,可用于其他的用途,例如,监视带外数据OOB,带外数据使用MSG_OOB标志发送到套接字上。当select函数返回的时候,exceptfds将清除其中的其他文件描述符,只留下标记有OOB数据的文件描述符。
  • timeout 参数是一个指向 struct timeval 类型的指针,它可以使 select()在等待 timeout 时间后若没有文件描述符准备好则返回。其timeval结构用于指定这段时间的秒数和微秒数。它可以使select处于三种状态:

(1) 若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;
(2) 若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;
(3) timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。

timeval 结构体定义

struct timeval
{
    int tv_sec;/* 秒 */
    int tv_usec;/* 微妙 */
};

【返回值】

  • int:若有就绪描述符返回其数目,若超时则为0,若出错则为-1

下列操作用来设置、清除、判断文件描述符集合。

FD_ZERO(fd_set *set);//清除一个文件描述符集。
FD_SET(int fd,fd_set *set);//将一个文件描述符加入文件描述符集中。
FD_CLR(int fd,fd_set *set);//将一个文件描述符从文件描述符集中清除。
FD_ISSET(int fd,fd_set *set);//判断文件描述符是否被置位

fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file descriptor),即文件句柄。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。

select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。

9.3.2 Poll函数

poll的函数原型:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

【参数说明】

  • fds:fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。

struct pollfd原型如下:

typedef struct pollfd {
        int fd;                 // 需要被检测或选择的文件描述符
        short events;           // 对文件描述符fd上感兴趣的事件
        short revents;          // 文件描述符fd上当前实际发生的事件
} pollfd_t;

其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。

  • nfds:记录数组fds中描述符的总数量。
  • timeout:指定等待的毫秒数,无论 I/O 是否准备好,poll() 都会返回,和select函数是类似的。

【返回值】

  • int:函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错;

poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024。这也是和select不同的地方。

9.4 LWIP 的select/poll实现

好了,接下来看看LWIP是如何实现select/poll的。

9.4.1 lwip_select实现

目前LWIP已经完全实现select,它是基于信号量的机制来实现的,函数名是lwip_select

LWIP实现Select的基本流程如下:

1.依次检套接字集合中的每个套接字的事件表示,若有效,则记录该套接字。

2.若存在一个或多事件,则返回,否则创建一个信号量并阻塞等待,记录信号量的结构体是select_cb_list,是一个链表,在[sockets.c]文件中定义的:

static struct lwip_select_cb *select_cb_list;//管理select的链表

lwip_select_cb原型如下:

/** Description for a task waiting in select */
struct lwip_select_cb {
  /** Pointer to the next waiting task */
  struct lwip_select_cb *next;
  /** Pointer to the previous waiting task */
  struct lwip_select_cb *prev;
#if LWIP_SOCKET_SELECT
  /** readset passed to select */
  fd_set *readset;
  /** writeset passed to select */
  fd_set *writeset;
  /** unimplemented: exceptset passed to select */
  fd_set *exceptset;
#endif /* LWIP_SOCKET_SELECT */
#if LWIP_SOCKET_POLL
  /** fds passed to poll; NULL if select */
  struct pollfd *poll_fds;
  /** nfds passed to poll; 0 if select */
  nfds_t poll_nfds;
#endif /* LWIP_SOCKET_POLL */
  /** don't signal the same semaphore twice: set to 1 when signalled */
  int sem_signalled;//是否释放信号领
  /** semaphore to wake up a task waiting for select */
  SELECT_SEM_T sem;//select阻塞的信号量
};

3.当套接字集合初始化,会向netconn结构注册回调函数event_callback,当有是事件发生时,回调函数就被被执行,而且回调函数会遍历select_cb_list,如果套接字在select_cb_list中,则select_cb_list释放一个信号量。

好了,接下来看看LWIP的select具体实现,其原型如下:

int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
            struct timeval *timeout)
{
  u32_t waitres = 0;//记录select等待时间
  int nready;
  fd_set lreadset, lwriteset, lexceptset;//记录发生事件的套接字
  u32_t msectimeout;
  int i;
  int maxfdp2;
#if LWIP_NETCONN_SEM_PER_THREAD
  int waited = 0;
#endif
#if LWIP_NETCONN_FULLDUPLEX
  fd_set used_sockets;
#endif
  SYS_ARCH_DECL_PROTECT(lev);

  LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%"S32_F" tvusec=%"S32_F")\\n",
                              maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,
                              timeout ? (s32_t)timeout->tv_sec : (s32_t) - 1,
                              timeout ? (s32_t)timeout->tv_usec : (s32_t) - 1));

  if ((maxfdp1 < 0) || (maxfdp1 > LWIP_SELECT_MAXNFDS)) {
    set_errno(EINVAL);
    return -1;
  }

  lwip_select_inc_sockets_used(maxfdp1, readset, writeset, exceptset, &used_sockets);

  /* Go through each socket in each list to count number of sockets which
     currently match */
  //检测套接字集合中是否发生事件
  nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);

  if (nready < 0) {
    /* one of the sockets in one of the fd_sets was invalid */
    set_errno(EBADF);
    lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
    return -1;
  } else if (nready > 0) {
    /* one or more sockets are set, no need to wait */
    LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\\n", nready));
  } else {
    /* If we don't have any current events, then suspend if we are supposed to */
    if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {
      LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\\n"));
      /* This is OK as the local fdsets are empty and nready is zero,
         or we would have returned earlier. */
    } else {
      /* None ready: add our semaphore to list:
         We don't actually need any dynamic memory. Our entry on the
         list is only valid while we are in this function, so it's ok
         to use local variables (unless we're running in MPU compatible
         mode). */
      API_SELECT_CB_VAR_DECLARE(select_cb);
      API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(ENOMEM); lwip_select_dec_sockets_used(maxfdp1, &used_sockets); return -1);
      memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));

      API_SELECT_CB_VAR_REF(select_cb).readset = readset;
      API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;
      API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_NETCONN_SEM_PER_THREAD
      API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
      if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
        /* failed to create semaphore */
        set_errno(ENOMEM);
        lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
        API_SELECT_CB_VAR_FREE(select_cb);
        return -1;
      }
#endif /* LWIP_NETCONN_SEM_PER_THREAD */

      lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

      /* Increase select_waiting for each socket we are interested in */
      maxfdp2 = maxfdp1;
      for (i = LWIP_SOCKET_OFFSET; i < maxfdp1; i++) {
        if ((readset && FD_ISSET(i, readset)) ||
            (writeset && FD_ISSET(i, writeset)) ||
            (exceptset && FD_ISSET(i, exceptset))) {
          struct lwip_sock *sock;
          SYS_ARCH_PROTECT(lev);
          sock = tryget_socket_unconn_locked(i);
          if (sock != NULL) {
            sock->select_waiting++;//读写异常通知,并且socket是存在的,则会将select_wainting增加1 
            if (sock->select_waiting == 0) {
              /* overflow - too many threads waiting */
              sock->select_waiting--;
              nready = -1;
              maxfdp2 = i;
              SYS_ARCH_UNPROTECT(lev);
              done_socket(sock);
              set_errno(EBUSY);
              break;
            }
            SYS_ARCH_UNPROTECT(lev);
            done_socket(sock);
          } else {
            /* Not a valid socket */
            nready = -1;
            maxfdp2 = i;
            SYS_ARCH_UNPROTECT(lev);
            set_errno(EBADF);
            break;
          }
        }
      }
	  
      if (nready >= 0) {
        /* Call lwip_selscan again: there could have been events between
           the last scan (without us on the list) and putting us on the list! */

//执行完上述操作,再次扫描一次是否有socket有事件产生
        nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
        if (!nready) {
          /* Still none ready, just wait to be woken */
          if (timeout == 0) {
            /* Wait forever */
            msectimeout = 0;
          } else {
            long msecs_long = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500) / 1000));
            if (msecs_long <= 0) {
              /* Wait 1ms at least (0 means wait forever) */
              msectimeout = 1;
            } else {
              msectimeout = (u32_t)msecs_long;
            }
          }
          //休眠指定时间,让出cpu控制权
          waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
          waited = 1;
#endif
        }
      }
      
      /* Decrease select_waiting for each socket we are interested in */
      for (i = LWIP_SOCKET_OFFSET; i < maxfdp2; i++) {
        if ((readset && FD_ISSET(i, readset)) ||
            (writeset && FD_ISSET(i, writeset)) ||
            (exceptset && FD_ISSET(i, exceptset))) {
          struct lwip_sock *sock;
          SYS_ARCH_PROTECT(lev);
          sock = tryget_socket_unconn_locked(i);
          if (sock != NULL) {
            /* for now, handle select_waiting==0... */
            LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
            if (sock->select_waiting > 0) {
              sock->select_waiting--;//休眠结束, 将对应socket->select_waiting减1
            }
            SYS_ARCH_UNPROTECT(lev);
            done_socket(sock);
          } else {
            SYS_ARCH_UNPROTECT(lev);
            /* Not a valid socket */
            nready = -1;
            set_errno(EBADF);
          }
        }
      }

      lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

#if LWIP_NETCONN_SEM_PER_THREAD
      if (API_SELECT_CB_VAR_REF(select_cb).sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
        /* don't leave the thread-local semaphore signalled */
        sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
      }
#else /* LWIP_NETCONN_SEM_PER_THREAD */
      sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
      API_SELECT_CB_VAR_FREE(select_cb);

      if (nready < 0) {
        /* This happens when a socket got closed while waiting */
        lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
        return -1;
      }

      if (waitres == SYS_ARCH_TIMEOUT) {
        /* Timeout */
        LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\\n"));
        /* This is OK as the local fdsets are empty and nready is zero,
           or we would have returned earlier. */
      } else {
        /* See what's set now after waiting */
        nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
        LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\\n", nready));
      }
    }
  }

  lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
  set_errno(0);
  if (readset) {
    *readset = lreadset;
  }
  if (writeset) {
    *writeset = lwriteset;
  }
  if (exceptset) {
    *exceptset = lexceptset;
  }
  return nready;
}

以上代码最核心的就是 socket->select_waiting 加1和减1的地方,当socket存在且的确需要监听事件,且并不是进来事件就已经产生或者已经超时,一定会加1;然后线程会有可能会进行休眠;正常情况下,休眠结束后,socket->select_waiting 减1,离开该函数,socket->select_waiting 恢复原值。但是,如果在休眠期间进行了close(socket) ,则通过try_socket(socket) 获取不到socket结构体,则socket->select_waiting不会进行减1,后面执行一系列语句后,退出该函数,socket->select_waiting没有恢复原值,且比进来时大1。针对该函数,socket->select_waiting加1的次数是>=减1的次数,所以如果只要在函数退出时没有恢复原值,则socket->select_waiting永远不可能再减为0了,此时socket资源就出现了假占用,该socket再也不能被其他人使用了。

lwip_select函数实现的具体流程如下:

Select的实现有个重要的结构体lwip_sock,其原型如下:

/** Contains all internal pointers and states used for a socket */
struct lwip_sock {
  /** sockets currently are built on netconns, each socket has one netconn */
  struct netconn *conn;
  /** data that was left from the previous read */
  union lwip_sock_lastdata lastdata;
#if LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL
  /** number of times data was received, set by event_callback(),
      tested by the receive and select functions */
  s16_t rcvevent;
  /** number of times data was ACKed (free send buffer), set by event_callback(),
      tested by select */
  u16_t sendevent;
  /** error happened for this socket, set by event_callback(), tested by select */
  u16_t errevent;
  /** counter of how many threads are waiting for this socket using select */
  SELWAIT_T select_waiting;
#endif /* LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL */
#if LWIP_NETCONN_FULLDUPLEX
  /* counter of how many threads are using a struct lwip_sock (not the 'int') */
  u8_t fd_used;
  /* status of pending close/delete actions */
  u8_t fd_free_pending;
#define LWIP_SOCK_FD_FREE_TCP  1
#define LWIP_SOCK_FD_FREE_FREE 2
#endif

#ifdef SAL_USING_POSIX
  rt_wqueue_t wait_head;
#endif
};

在socket数据接收时,lwip_sock利用netconn相关的接收函数获得一个pbuf(对于TCP)或者一个netbuf(对于UDP)数据,而这二者封装的数据可能大于socket用户指定的数据接收长度,因此在这种情况下,这两个数据包需要暂时保存在socket中,以待用户下一次读取,这里lastdata就用于指向未被用户完全读取的数据包,而lastoffset则指向了未读取的数据在数据包中的偏移。lwip_sock最后的五个字段是为select机制实现时使用的。

lwip_socket是上层Socket API中的实现,它对netconn结构的封装和增强,描述一个具体连接。它基于内核netconn来实现所有逻辑,conn指向了与socket对应的netconn结构。Netconn原型如下:

/** A callback prototype to inform about events for a netconn */
typedef void (* netconn_callback)(struct netconn *, enum netconn_evt, u16_t len);

/** A netconn descriptor */
struct netconn {
  /** type of the netconn (TCP, UDP or RAW) */
  enum netconn_type type;
  /** current state of the netconn */
  enum netconn_state state;
  /** the lwIP internal protocol control block */
  union {
    struct ip_pcb  *ip;
    struct tcp_pcb *tcp;
    struct udp_pcb *udp;
    struct raw_pcb *raw;
  } pcb;
  /** the last asynchronous unreported error this netconn had */
  err_t pending_err;
#if !LWIP_NETCONN_SEM_PER_THREAD
  /** sem that is used to synchronously execute functions in the core context */
  sys_sem_t op_completed;
#endif
  /** mbox where received packets are stored until they are fetched
      by the netconn application thread (can grow quite big) */
  

以上是关于《嵌入操作系统 – 玩转ART-Pi开发板》第9章 基于Select/Poll实现并发服务器的主要内容,如果未能解决你的问题,请参考以下文章

《嵌入操作系统 – 玩转ART-Pi开发板》第9章 基于Select/Poll实现并发服务器

《嵌入操作系统 – 玩转ART-Pi开发板》第9章 基于Select/Poll实现并发服务器

《嵌入操作系统 – 玩转ART-Pi开发板》第8章 APT-Pi双网络实现自动切换

《嵌入操作系统 – 玩转ART-Pi开发板》第8章 APT-Pi双网络实现自动切换

《嵌入式 - Lwip开发指南》第3章 移植LWIP(无系统)

《嵌入式 - Lwip开发指南》第3章 移植LWIP(无系统)