为 websocket 提升堆栈协程,如何发布函数并从另一个线程恢复

Posted

技术标签:

【中文标题】为 websocket 提升堆栈协程,如何发布函数并从另一个线程恢复【英文标题】:Boost stackful coroutine for websocket, how to post a function and resume to do from a another thread 【发布时间】:2021-01-26 12:12:39 【问题描述】:
int main()

    tcp::socket socket(iocp);
    acceptor.async_accept(socket, yield[ec]);
    if (ec)
        fail(ec, "accept");
    else
        boost::asio::spawn(acceptor.get_executor(), std::bind(&do_session, websocket::stream<beast::tcp_stream>(std::move(socket)), std::placeholders::_1));

    ... iocp run


void do_session(websocket::stream<beast::tcp_stream>& ws, net::yield_context yield)

    while(ws.is_open())
    
        ws.async_read(buffer, yield[ec]);
        ... process the buffer
        ... execute posted callbacks
    


void another_thread()

    while(isAppNotExit)
    
        post_to_specified_coroutine(ws, []()    ... do in courutine same thread );
    

我需要在任意线程中发布一个函数,让指定的协程运行该函数,也就是上面“执行发布的回调”的代码部分。但是,这个任务下达后,协程可能处于 async_read 或 async_write 状态。是否可以发布数据之类的事件并让 async_read 或 async_write 函数立即返回?

【问题讨论】:

如果这样的目标很难,那么如何在任何线程中写入任何客户端websocket? 【参考方案1】:

我猜问题的本质是这样的:在 2 个通道上使用 select:一个容量为 1 的通道和一个(可能)无限容量的通道。

用asio异步操作实现select

编写一个 asio 异步操作来等待多个(两个)事物。 (asio异步操作模板:c++ - How to wait for a function to return with Boost:::Asio? - Stack Overflow)。

受互斥体保护的状态:

std::optional&lt;read_result&gt; std::vector&lt;functor&gt; bool(是否有正在进行的 async_read) std::optional&lt;completion handler&gt;

你的async_wait_for_2_things:

    从完成令牌 (yield[ec]) 中获取完成处理程序(一个可调用的,可以恢复您的协程); 锁定互斥体(使用保护); 如果存在来自 another_thread 的待处理函子,则将其取出,发布完成处理程序; 否则,如果存在未决的 read_result,则将其取出,发布完成处理程序; 否则,如果存在正在进行的 async_read(bool 为真),则存储完成处理程序(如果已存储完成处理程序,则抛出“不可能发生”); else(没有挂起的函子,没有挂起的 read_result,async_read 尚未启动),存储完成处理程序(如果已经存储了完成处理程序,则抛出“不可能发生”),将 bool 设置为 true(如果bool 已经为真,抛出“不可能发生”),调用 async_read; 解锁互斥锁;

async_read 的回调:

    锁定互斥体(使用保护); 将 bool 设置为 false(如果 bool 已经为 false,则抛出“不可能发生”); 如果有完成处理程序,取出来,贴出来; 否则,存储read_result(如果已经存储了read_result,则抛出“不可能发生”); 解锁互斥锁;

另一个线程发布函子的代码:

    锁定互斥体(使用保护); 如果有完成处理程序,取出来,贴出来; 否则,存储函子; 解锁互斥锁;

使用异步事件实现选择

    async_read(使用回调重载)的 lambda 完成处理程序:存储结果,通知 asynchronous_event; another_thread:存储函子,通知异步事件; do_session:异步等待 asynchronous_event,加载结果或仿函数; asynchronous_event 的数据位于受互斥体保护的std::pair&lt;std::optional&lt;read_result&gt;, std::vector&lt;functor&gt;&gt; 中;

使用定时器实现异步事件:c++ - Why does Boost.Asio not support an event-based interface? - Stack Overflow。

这不适用,因为“异步事件”不是“异步条件变量”,它不能:

在异步等待中原子地释放互斥锁

(可能的顺序:do_session 释放互斥锁,然后发布函子,然后通知事件(cancel_one),然后 do_session 等待事件(timer_.async_wait(yield[ec]);)并永远阻塞)


使用异步锁存器实现选择

    async_read(使用回调重载)的lambda handler:①存储结果并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥唤醒); another_thread:①存储函子并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥唤醒); do_session:等待asynchronous_latch_consumer(, ③wake up),④加载result或functor并重置asynchronous_latch_consumer,⑤通知asynchronous_latch_producer; asynchronous_latch_consumer 和 asynchronous_latch_producer 的数据在std::pair&lt;std::optional&lt;read_result&gt;, std::vector&lt;functor&gt;&gt;

使用定时器实现异步锁存器:c++ - Cancelling boost asio deadline timer safely - Stack Overflow。修改该异步事件实现以获取异步锁存器:在构造函数和reset.expires_at(Timer::clock_type::time_point::max());在notify_all_one_shot.expires_at(Timer::clock_type::time_point::min())

这不适用,因为其中一个生产者可能会永远阻塞。

【讨论】:

以上是关于为 websocket 提升堆栈协程,如何发布函数并从另一个线程恢复的主要内容,如果未能解决你的问题,请参考以下文章

如何确保清除 ktor websocket 客户端创建的所有 Kotlin 协程?

如何在堆栈上分配数组以获得性能提升?

Go 协程

Python - 如何使用 asyncio 同时运行多个协程?

Golang教程:goroutine协程

如何编写自己的 TCP/IP 或 ISO/OSI 堆栈以使用 WebSockets 而无需每次都使用握手?