为 websocket 提升堆栈协程,如何发布函数并从另一个线程恢复
Posted
技术标签:
【中文标题】为 websocket 提升堆栈协程,如何发布函数并从另一个线程恢复【英文标题】:Boost stackful coroutine for websocket, how to post a function and resume to do from a another thread 【发布时间】:2021-01-26 12:12:39 【问题描述】:int main()
tcp::socket socket(iocp);
acceptor.async_accept(socket, yield[ec]);
if (ec)
fail(ec, "accept");
else
boost::asio::spawn(acceptor.get_executor(), std::bind(&do_session, websocket::stream<beast::tcp_stream>(std::move(socket)), std::placeholders::_1));
... iocp run
void do_session(websocket::stream<beast::tcp_stream>& ws, net::yield_context yield)
while(ws.is_open())
ws.async_read(buffer, yield[ec]);
... process the buffer
... execute posted callbacks
void another_thread()
while(isAppNotExit)
post_to_specified_coroutine(ws, []() ... do in courutine same thread );
我需要在任意线程中发布一个函数,让指定的协程运行该函数,也就是上面“执行发布的回调”的代码部分。但是,这个任务下达后,协程可能处于 async_read 或 async_write 状态。是否可以发布数据之类的事件并让 async_read 或 async_write 函数立即返回?
【问题讨论】:
如果这样的目标很难,那么如何在任何线程中写入任何客户端websocket? 【参考方案1】:我猜问题的本质是这样的:在 2 个通道上使用 select
:一个容量为 1 的通道和一个(可能)无限容量的通道。
用asio异步操作实现select
编写一个 asio 异步操作来等待多个(两个)事物。 (asio异步操作模板:c++ - How to wait for a function to return with Boost:::Asio? - Stack Overflow)。
受互斥体保护的状态:
std::optional<read_result>
std::vector<functor>
bool
(是否有正在进行的 async_read)
std::optional<completion handler>
你的async_wait_for_2_things
:
-
从完成令牌 (
yield[ec]
) 中获取完成处理程序(一个可调用的,可以恢复您的协程);
锁定互斥体(使用保护);
如果存在来自 another_thread 的待处理函子,则将其取出,发布完成处理程序;
否则,如果存在未决的 read_result,则将其取出,发布完成处理程序;
否则,如果存在正在进行的 async_read(bool 为真),则存储完成处理程序(如果已存储完成处理程序,则抛出“不可能发生”);
else(没有挂起的函子,没有挂起的 read_result,async_read 尚未启动),存储完成处理程序(如果已经存储了完成处理程序,则抛出“不可能发生”),将 bool 设置为 true(如果bool 已经为真,抛出“不可能发生”),调用 async_read;
解锁互斥锁;
async_read 的回调:
-
锁定互斥体(使用保护);
将 bool 设置为 false(如果 bool 已经为 false,则抛出“不可能发生”);
如果有完成处理程序,取出来,贴出来;
否则,存储read_result(如果已经存储了read_result,则抛出“不可能发生”);
解锁互斥锁;
另一个线程发布函子的代码:
-
锁定互斥体(使用保护);
如果有完成处理程序,取出来,贴出来;
否则,存储函子;
解锁互斥锁;
使用异步事件实现选择
-
async_read(使用回调重载)的 lambda 完成处理程序:存储结果,通知 asynchronous_event;
another_thread:存储函子,通知异步事件;
do_session:异步等待 asynchronous_event,加载结果或仿函数;
asynchronous_event 的数据位于受互斥体保护的
std::pair<std::optional<read_result>, std::vector<functor>>
中;
使用定时器实现异步事件:c++ - Why does Boost.Asio not support an event-based interface? - Stack Overflow。
这不适用,因为“异步事件”不是“异步条件变量”,它不能:
在异步等待中原子地释放互斥锁和块(可能的顺序:do_session 释放互斥锁,然后发布函子,然后通知事件(cancel_one
),然后 do_session 等待事件(timer_.async_wait(yield[ec]);
)并永远阻塞)
使用异步锁存器实现选择
-
async_read(使用回调重载)的lambda handler:①存储结果并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥唤醒);
another_thread:①存储函子并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥唤醒);
do_session:等待asynchronous_latch_consumer(, ③wake up),④加载result或functor并重置asynchronous_latch_consumer,⑤通知asynchronous_latch_producer;
asynchronous_latch_consumer 和 asynchronous_latch_producer 的数据在
std::pair<std::optional<read_result>, std::vector<functor>>
;
使用定时器实现异步锁存器:c++ - Cancelling boost asio deadline timer safely - Stack Overflow。修改该异步事件实现以获取异步锁存器:在构造函数和reset
、.expires_at(Timer::clock_type::time_point::max())
;在notify_all_one_shot
,.expires_at(Timer::clock_type::time_point::min())
。
这不适用,因为其中一个生产者可能会永远阻塞。
【讨论】:
以上是关于为 websocket 提升堆栈协程,如何发布函数并从另一个线程恢复的主要内容,如果未能解决你的问题,请参考以下文章
如何确保清除 ktor websocket 客户端创建的所有 Kotlin 协程?