对于 boost io_service,epoll_wait 上是不是只有一个线程阻塞?

Posted

技术标签:

【中文标题】对于 boost io_service,epoll_wait 上是不是只有一个线程阻塞?【英文标题】:For boost io_service, is only-one thread blocked on epoll_wait?对于 boost io_service,epoll_wait 上是否只有一个线程阻塞? 【发布时间】:2016-10-31 08:20:13 【问题描述】:

我看了Boost ASIO的源码,我想知道它调用epoll_wait只有一个线程(当然,如果我使用epoll reactor)。 我想找到关于多个线程调用 epoll_wait 的解决方案,这可能会导致不同的线程同时对同一个套接字进行读取。 我读了一些关键代码如下:

// Prepare to execute first handler from queue.
      operation* o = op_queue_.front();
      op_queue_.pop();
      bool more_handlers = (!op_queue_.empty());

      if (o == &task_operation_)
      
        task_interrupted_ = more_handlers;

        if (more_handlers && !one_thread_)
          wakeup_event_.unlock_and_signal_one(lock);
        else
          lock.unlock();

        task_cleanup on_exit =  this, &lock, &this_thread ;
        (void)on_exit;

        // Run the task. May throw an exception. Only block if the operation
        // queue is empty and we're not polling, otherwise we want to return
        // as soon as possible.
        task_->run(!more_handlers, this_thread.private_op_queue);
      

task_是epoll reactor,运行时会调用epoll_wait, 我猜它可能只有一个线程来调用它,因为 op_queue_ 中只有一个“task_operation_”,对吗? 如果我想在多线程中使用epoll,或者我可以使用“EPOLLONESHOT”,这样可以保证一个线程一次处理一个socket。

【问题讨论】:

您无需做任何特别的事情。除非您认为自己比 ASIO 开发人员更了解如何在您的平台上进行多线程处理,否则不要乱用它。这已经是他们能想到的最好的设计了。 【参考方案1】: 第一种情况是当您使用io_service 的单个实例并从多个线程调用io_service::run 方法时。

让我们看看schduler::run 函数(简化):

std::size_t scheduler::run(asio::error_code& ec)

  mutex::scoped_lock lock(mutex_);

  std::size_t n = 0;
  for (; do_run_one(lock, this_thread, ec); lock.lock())
    if (n != (std::numeric_limits<std::size_t>::max)())
      ++n;
  return n;

因此,在持有锁的情况下,它会调用do_run_one 方法,类似于:

std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
    scheduler::thread_info& this_thread,
    const asio::error_code& ec)

  while (!stopped_)
  
    if (!op_queue_.empty())
    
      // Prepare to execute first handler from queue.
      operation* o = op_queue_.front();
      op_queue_.pop();
      bool more_handlers = (!op_queue_.empty());

      if (o == &task_operation_)
      
        task_interrupted_ = more_handlers;

        if (more_handlers && !one_thread_)
          wakeup_event_.unlock_and_signal_one(lock);
        else
          lock.unlock();

        task_cleanup on_exit =  this, &lock, &this_thread ;
        (void)on_exit;

        task_->run(!more_handlers, this_thread.private_op_queue);
      
      else
      
        //......
      
    
    else
    
      wakeup_event_.clear(lock);
      wakeup_event_.wait(lock);
    
  

  return 0;

代码中有趣的部分是以下几行:

if (more_handlers && !one_thread_)
  wakeup_event_.unlock_and_signal_one(lock);
else
  lock.unlock(); 

我们现在讨论的情况是多线程的情况,所以第一个条件会满足(假设我们在 op_queue_ 中有相当多的待处理任务)。

wakeup_event_.unlock_and_signal_one 最终做的是释放/解锁lock 并通知正在等待条件等待的线程之一。所以,有了这个,至少另一个线程(谁得到锁)现在可以调用do_run_one

如您所说,您的task_epoll_reactor。而且,在它的run 方法中,它调用epoll_wait(不持有schedulerlock_)。

这里有趣的是它在遍历epoll_wait 返回的所有就绪描述符时所做的事情。它将它们推回它作为参数中的引用接收的操作队列中。现在推送的操作的运行时类型为 descriptor_state 而不是 task_operation_

for (int i = 0; i < num_events; ++i)
  
    void* ptr = events[i].data.ptr;
    if (ptr == &interrupter_)
    
      // don't call work_started() here. This still allows the scheduler to
      // stop if the only remaining operations are descriptor operations.
      descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
      descriptor_data->set_ready_events(events[i].events);
      ops.push(descriptor_data);
    
  

因此,在 scheduler::do_run_one 内的 while 循环的下一次迭代中,对于已完成的任务,它将到达 else 分支(我之前在粘贴中省略了该分支):

     else
      
        std::size_t task_result = o->task_result_;

        if (more_handlers && !one_thread_)
          wake_one_thread_and_unlock(lock);
        else
          lock.unlock();

        // Ensure the count of outstanding work is decremented on block exit.
        work_cleanup on_exit =  this, &lock, &this_thread ;
        (void)on_exit;

        // Complete the operation. May throw an exception. Deletes the object.
        o->complete(this, ec, task_result);

        return 1;
      

调用complete 函数指针,这又可能会调用用户传递给async_readasync_write API 的句柄。

第二种情况,是您创建一个io_service 对象池并在一个或多个线程上调用其run 方法,即io_servicethread 之间的映射可以是1:1 或1:N 为可能适合您的应用。这样,您可以以循环方式将 io_service 对象分配给 soucket 对象。

现在,来回答你的问题:

如果我想在多线程中使用 epoll,或者我可以使用“EPOLLONESHOT” 这样就可以保证一个线程一次处理一个socket。

如果我理解正确,您想使用 1 个线程将所有事件处理到套接字吗?我认为这可以通过遵循方法 2 来实现,即创建一个 io_service 对象池并将其映射到 1 个线程。通过这种方式,您可以确保特定套接字上的所有活动将仅由一个线程处理,即io_service:run 所在的线程。

您不必担心在上述情况下设置EPOLLONESHOT

我不太确定使用第一种方法获得相同的行为,即多线程和 1 io_service

但是,如果您根本不使用线程,即您的 io_service 在单线程上运行,那么您不必担心这一切,毕竟 asio 的目的是抽象所有这些东西。

【讨论】:

感谢您写了这么多。它对我有很大帮助。非常感谢。 @NeoLiu 就像关于 *** 的一般信息 - 如果您发现任何对您提出的问题确实有帮助的答案,您应该通过单击柜台下方的“检查”标志“接受”它(右上角的答案)。我看到你到目前为止还没有接受任何答案,既然你现在知道了,你就可以开始接受它们了:)【参考方案2】:

只有一个线程会调用epoll_wait。一旦线程接收到描述符的事件通知,它会将描述符解复用到运行io_service 的所有线程。根据Platform-Specific Implementation Notes:

线程:

使用epoll 的多路分解在调用io_service::run()io_service::run_one()io_service::poll()io_service::poll_one() 的线程之一中执行。

单个描述符将由执行 I/O 的单个线程处理。因此,当使用异步操作时,I/O 不会为给定的套接字同时执行。

【讨论】:

以上是关于对于 boost io_service,epoll_wait 上是不是只有一个线程阻塞?的主要内容,如果未能解决你的问题,请参考以下文章

Boost Asio总结 io_service

对于带有单个接受器的线程 boost::asio 服务器,我们是不是需要每个线程多个 io_service

boost::asio io_service 停止特定线程

Boost asio基本概念

Boost::Asio : io_service.run() vs poll() 或者我如何在主循环中集成 boost::asio

boost::asio::io_service类