异步 IO 的整洁代码
Posted
技术标签:
【中文标题】异步 IO 的整洁代码【英文标题】:tidy code for asynchronous IO 【发布时间】:2010-10-27 08:35:03 【问题描述】:虽然异步 IO(带有 select/poll/epoll/kqueue 等的非阻塞描述符)不是网络上记录最多的东西,但有一些很好的例子。
但是,所有这些示例在确定了调用返回的句柄后,都只有一个“do_some_io(fd)
”存根。他们并没有真正解释如何以这种方法最好地处理实际的异步 IO。
Blocking IO 非常简洁易读。另一方面,非阻塞的异步 IO 麻烦且杂乱。
有哪些方法?什么是健壮和可读的?
void do_some_io(int fd)
switch(state)
case STEP1:
... async calls
if(io_would_block)
return;
state = STEP2;
case STEP2:
... more async calls
if(io_would_block)
return;
state = STEP3;
case STEP3:
...
或者也许(ab)使用 GCC 的计算 goto:
#define concatentate(x,y) x##y
#define async_read_xx(var,bytes,line) \
concatentate(jmp,line): \
if(!do_async_read(bytes,&var)) \
schedule(EPOLLIN); \
jmp_read = &&concatentate(jmp,line); \
return; \
// macros for making async code read like sync code
#define async_read(var,bytes) \
async_read_xx(var,bytes,__LINE__)
#define async_resume() \
if(jmp_read) \
void* target = jmp_read; \
jmp_read = NULL; \
goto *target; \
void do_some_io()
async_resume();
async_read(something,sizeof(something));
async_read(something_else,sizeof(something_else));
或者可能是 C++ 异常和状态机,因此工作函数可以触发中止/恢复位,或者可能是表驱动的状态机?
我追求的不是如何让它工作,而是如何让它变得可维护!
【问题讨论】:
【参考方案1】:我建议看看:http://www.kegel.com/c10k.html,然后看看现有的库,例如已经完成这项工作的 libevent、Boost.Asio,看看它们是如何工作的。
关键是每种系统调用的方法可能不同:
select 是简单的反应器 epoll 具有需要不同方法的边缘或电平触发接口 iocp 是前摄器,需要其他方法建议:使用良好的现有库,如 C++ 的 Boost.Asio 或 C 的 libevent。
编辑:这就是 ASIO 的处理方式
class connection
boost::asio:ip::tcp::socket socket_;
public:
void run()
// for variable length chunks
async_read_until(socket_,resizable_buffer,'\n',
boost::bind(&run::on_line_recieved,this,errorplacehplder);
// or constant length chunks
async_read(socket_,buffer(some_buf,buf_size),
boost::bind(&run::on_line_recieved,this,errorplacehplder);
void on_line_recieved(error e)
// handle it
run();
;
因为 ASIO 作为前摄者,它会在操作完成时通知您,并且 在内部处理 EWOULDBLOCK。
如果你用反应器的话,你可以模拟这种行为:
class conn
// Application logic
void run()
read_chunk(&conn::on_chunk_read,size);
void on_chunk_read()
/* do something;*/
// Proactor wrappers
void read_chunk(void (conn::*callback),int size, int start_point=0)
read(socket,buffer+start,size)
if( complete )
(this->*callback()
else
this -> tmp_size-=size-read;
this -> tmp_start=start+read;
this -> tmp_callback=callback
your_event_library_register_op_on_readable(callback,socket,this);
void callback()
read_chunk(tmp_callback,tmp_size,tmp_start);
类似的东西。
【讨论】:
libevent 缓冲的事件包装器,甚至带有高低水位线,是一种避免粗暴的 IO 处理的便捷方式;但是你如何在调用它的代码中表示这种状态,它必须是可恢复的? 谢谢你 Artyom,我希望人们通过谷歌搜索 epoll 等找到这个! 你应该把“&run::on_line_received”改成“&connection::on_line_received”【参考方案2】:状态机是一种不错的方法。前面有点复杂,将来会为您省去头疼的问题,因为未来真的很快就开始了。 ;-)
另一种方法是使用线程并在每个线程的单个 fd 上执行阻塞 I/O。此处的权衡是使 I/O 变得简单,但可能在同步中引入复杂性。
【讨论】:
一个用于异步 io 的状态机的快速示例会很有用 我也很想看看这个例子【参考方案3】:存在很好的设计模式“协程”来解决这个问题。
这是两全其美:整洁的代码,完全像同步 io 流和无需上下文切换的出色性能,就像 async io 提供的那样。协程内部看起来像一个普通的同步线程,具有单指令指针。但是许多协程可以在一个 OS 线程中运行(所谓的“协同多任务”)。
示例协程代码:
void do_some_io()
blocking_read(something,sizeof(something));
blocking_read(something_else,sizeof(something_else));
blocking_write(something,sizeof(something));
看起来像同步代码,但实际上控制流使用了另一种方式,像这样:
void do_some_io()
// return control to network io scheduler, to handle another coroutine
blocking_read(something,sizeof(something));
// when "something" is read, scheduler fill given buffer and resume this coroutine
// return control to network io scheduler, to handle another coroutine
CoroSleep( 1000 );
// scheduler create async timer and when it fires, scheduler pass control to this coroutine
...
// and so on
因此,单线程调度程序使用用户定义的代码和对 io 的整洁的类似同步的调用来控制许多协程。
C++协程实现示例是“boost.coroutine”(实际上不是boost的一部分:) http://www.crystalclearsoftware.com/soc/coroutine/ 这个库完全实现了协程机制,可以使用 boost.asio 作为调度器和异步 io 层。
【讨论】:
Boost.Coroutine 现在是 boost 的一部分。【参考方案4】:您需要有一个提供 async_schedule()、async_foreach()、async_tick() 等的主循环。这些函数依次将条目放入全局方法列表中,这些方法将在下次调用 async_tick() 时运行。然后,您可以编写更整洁且不包含任何 switch 语句的代码。
你可以写:
async_schedule(callback, arg, timeout);
或者:
async_wait(condition, callback, arg, timeout);
然后您的条件甚至可以在另一个线程中设置(前提是您在访问该变量时注意线程安全)。
我已经在 C 中为我的嵌入式项目实现了一个异步框架,因为我想要非抢占式多任务处理,而异步非常适合通过在主循环的每次迭代期间做一些工作来完成许多任务。
代码在这里:https://github.com/mkschreder/fortmax-blocks/blob/master/common/kernel/async.c
【讨论】:
【参考方案5】:您希望将“io”与处理分离,此时您阅读的代码将变得非常可读。基本上你有:
int read_io_event(...) /* triggers when we get a read event from epoll/poll/whatever */
/* read data from "fd" into a vstr/buffer/whatever */
if (/* read failed */) /* return failure code to event callback */ ;
if (/* "message" received */) return process_io_event();
if (/* we've read "too much" */) /* return failure code to event callback */ ;
return /* keep going code for event callback */ ;
int process_io_event(...)
/* this is where you process the HTTP request/whatever */
...然后真正的代码在处理事件中,即使您有多个请求响应,它也很可读,您只需在设置状态或其他内容后执行“return read_io_event()”。
【讨论】:
构建缓冲区在行或消息级别上工作得很好;但是当你解析更复杂的东西时,你如何在 process_io_event() 处理程序中表示这种状态? 使用状态机是很自然的。在实践中,这通常是一个标记联合,如struct enum step1_tag, step2_tag, ..., union int step1_data, struct char *s, size_t n step2_data, ...
以上是关于异步 IO 的整洁代码的主要内容,如果未能解决你的问题,请参考以下文章