为啥 io_context 出现在我的 boost asio 协程服务器中
Posted
技术标签:
【中文标题】为啥 io_context 出现在我的 boost asio 协程服务器中【英文标题】:Why io_context sopped in my boost asio coroutine server为什么 io_context 出现在我的 boost asio 协程服务器中 【发布时间】:2020-03-12 23:10:07 【问题描述】:我的服务器基于 boost spawn echo 服务器示例,并在 this thread 中进行了改进。真正的服务器很复杂,我做了一个更简单的服务器来说明问题:
服务器监听 12345 端口,从新连接接收 0x4000 字节数据。
客户端运行1000个线程,连接服务器,发送0x4000字节数据。
问题:当客户端运行时,1秒后通过控制台中的Ctrl-C杀死客户端进程,然后服务器的io_context
将停止,服务器运行到无限循环并消耗 100% cpu。如果这没有发生,重复启动客户端并杀死它几次,它就会发生。可能几次后它会耗尽 TCP 端口,等几分钟再试一次,它会在我的机器上杀死客户端 3~15 次后发生。
boost document 表示io_context.stopped()
用于判断是否停止
要么通过显式调用 stop(),要么由于工作用完
我从不调用io_context.stop()
,并使用make_work_guard(io_context)
保持io_context
不停止,但为什么它仍然停止?
我的环境:Win10-64bit, boost 1.71.0
服务器代码:
#include <iostream>
using namespace std;
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
namespace ba=boost::asio;
#define SERVER_PORT 12345
#define DATA_LEN 0x4000
struct session : public std::enable_shared_from_this<session>
tcp::socket socket_;
boost::asio::steady_timer timer_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
explicit session(boost::asio::io_context& io_context, tcp::socket socket)
: socket_(std::move(socket)),
timer_(io_context),
strand_(io_context.get_executor())
void go()
auto self(shared_from_this());
boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
spawn(yield, [this, self](ba::yield_context yield)
timer_.expires_from_now(10s); // 10 second
while (socket_.is_open())
boost::system::error_code ec;
timer_.async_wait(yield[ec]);
// timeout triggered, timer was not canceled
if (ba::error::operation_aborted != ec)
socket_.close();
);
try
// recv data
string packet;
// read data
boost::system::error_code ec;
ba::async_read(socket_,
ba::dynamic_buffer(packet),
ba::transfer_exactly(DATA_LEN),
yield[ec]);
if(ec)
throw "read_fail";
catch (...)
cout << "exception" << endl;
timer_.cancel();
socket_.close();
);
;
struct my_server
my_server()
~my_server()
void start()
ba::io_context io_context;
auto worker = ba::make_work_guard(io_context);
ba::spawn(io_context, [&](ba::yield_context yield)
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if (!ec)
std::make_shared<session>(io_context, std::move(socket))->go();
);
// Run io_context on All CPUs
auto thread_count = std::thread::hardware_concurrency();
boost::thread_group tgroup;
for (auto i = 0; i < thread_count; ++i)
tgroup.create_thread([&]
for (;;)
try
if (io_context.stopped()) // <- this happens after killing Client process several times
cout << "io_context STOPPED, now server runs infinit loop with full cpu usage" << endl;
io_context.run();
catch(const std::exception& e)
MessageBox(0, "This never popup", e.what(), 0);
catch(const boost::exception& e)
MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0);
catch(...) MessageBox(0, "This never popup", "", 0);
);
tgroup.join_all();
;
int main()
my_server svr;
svr.start();
客户:
#include <iostream>
#include <random>
#include <thread>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace std;
using boost::asio::ip::tcp;
namespace ba=boost::asio;
#define SERVER "127.0.0.1"
#define PORT "12345"
int main()
boost::asio::io_context io_context;
static string data_0x4000(0x4000, 'a');
boost::thread_group tgroup;
for (auto i = 0; i < 1000; ++i)
tgroup.create_thread([&]
for(;;)
try
tcp::socket s(io_context);
tcp::resolver resolver(io_context);
boost::asio::connect(s, resolver.resolve(SERVER, PORT));
ba::write(s, ba::buffer(data_0x4000));
catch (std::exception e)
cout << " exception: " << e.what() << endl;
catch (...)
cout << "unknown exception" << endl;
);
tgroup.join_all();
return 0;
更新解决方法:
我猜io_context
和协程会出现问题,所以我尝试将不必要的spawn
替换为std::thread
,并且它起作用了,io_context
不再停止。但是为什么还是会出现这个问题呢?
替换:
ba::spawn(io_context, [&](ba::yield_context yield)
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if (!ec)
std::make_shared<session>(io_context, std::move(socket))->go();
);
收件人:
std::thread([&]()
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.accept(socket, ec);
if (!ec)
std::make_shared<session>(io_context, std::move(socket))->go();
).detach();
【问题讨论】:
【参考方案1】:即使进行(非常)广泛的压力测试,我也无法在 linux 上重现您的问题。
除了某些会话按预期到达“EOF”消息之外,即使是硬杀客户端进程也没有显示任何其他影响。
存在可用端口用完的问题,但这主要是因为您在客户端中重新连接的速度太快了。
跳出框框思考
会不会是您在没有同步的情况下使用std::cout
和/或MessageBox
² 而MSVC 的标准库不能很好地处理它?
会不会是 asio 运行循环引发了 catch
处理程序未正确捕获的异常?我不知道这是否相关,但 MSVC 确实有 SEH(结构化异常)¹
没有必要在那里保持“运行”的紧密循环。如果你真的想继续运行循环,你应该在两者之间调用io_context.restart();
。我不建议这样做,因为它会使任何常规关机变得不可能。
如果您有兴趣,可以对代码进行一些小的调整。它添加了一些处理/制作的会话/连接的可视化。请注意,client
几乎没有变化,但 server
有一些变化可能会激发您的灵感:
server.cpp
#include <iostream>
#include <iomanip>
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using namespace std::literals;
#define SERVER_PORT 12345
#define DATA_LEN 0x4000
void MessageBox(int, std::string const& caption, std::string const& message, ...)
std::cerr << caption << ": " << std::quoted(message) << std::endl;
struct session : public std::enable_shared_from_this<session>
tcp::socket socket_;
ba::steady_timer timer_;
ba::strand<ba::io_context::executor_type> strand_;
explicit session(ba::io_context& io_context, tcp::socket socket)
: socket_(std::move(socket)),
timer_(io_context),
strand_(io_context.get_executor())
void go()
auto self(shared_from_this());
ba::spawn(strand_, [this, self](ba::yield_context yield)
spawn(yield, [this, self](ba::yield_context yield)
while (socket_.is_open())
timer_.expires_from_now(10s);
boost::system::error_code ec;
timer_.async_wait(yield[ec]);
// timeout triggered, timer was not canceled
if (ba::error::operation_aborted != ec)
socket_.close(ec);
);
try
// recv data
std::string packet;
// read data
ba::async_read(socket_,
ba::dynamic_buffer(packet),
ba::transfer_exactly(DATA_LEN),
yield);
std::cout << std::unitbuf << ".";
catch (std::exception const& e)
std::cout << "exception: " << std::quoted(e.what()) << std::endl;
catch (...)
std::cout << "exception" << std::endl;
boost::system::error_code ec;
timer_.cancel(ec);
socket_.close(ec);
);
;
struct my_server
void start()
ba::io_context io_context;
auto worker = ba::make_work_guard(io_context);
ba::spawn(io_context, [&](ba::yield_context yield)
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if (!ec)
std::make_shared<session>(io_context, std::move(socket))->go();
);
// Run io_context on All CPUs
auto thread_count = std::thread::hardware_concurrency();
boost::thread_group tgroup;
for (auto i = 0u; i < thread_count; ++i)
tgroup.create_thread([&]
for (;;)
try
io_context.run();
break;
catch(const std::exception& e)
MessageBox(0, "This never popup", e.what(), 0);
catch(const boost::exception& e)
MessageBox(0, "This never popup", boost::diagnostic_information(e).data(), 0);
catch(...) MessageBox(0, "This never popup", "", 0);
std::cout << "stopped: " << io_context.stopped() << std::endl;
);
tgroup.join_all();
;
int main()
my_server svr;
svr.start();
client.cpp
#include <iostream>
#include <random>
#include <thread>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using boost::asio::ip::tcp;
namespace ba=boost::asio;
#define SERVER "127.0.0.1"
#define PORT "12345"
int main()
ba::io_context io_context;
static std::string const data_0x4000(0x4000, 'a');
boost::thread_group tgroup;
for (auto i = 0; i < 1000; ++i)
tgroup.create_thread([&]
for(;;)
try
tcp::socket s(io_context);
tcp::resolver resolver(io_context);
ba::connect(s, resolver.resolve(SERVER, PORT));
s.set_option(ba::socket_base::reuse_address(true));
ba::write(s, ba::buffer(data_0x4000));
catch (std::exception const& e)
std::cout << " exception: " << e.what() << std::endl;
catch (...)
std::cout << "unknown exception" << std::endl;
std::cout << std::unitbuf << ".";
);
tgroup.join_all();
¹ 参见例如https://docs.microsoft.com/en-us/cpp/build/reference/eh-exception-handling-model?view=vs-2019#remarks
² 也许MessageBox
只允许来自“UI”线程。
【讨论】:
如果我删除了MessageBox
,就会出现问题。如果是 SEH,服务器会崩溃,因为我没有用 __try __except
块捕获它。 io_context.restart()
是一种解决方法,但无法解释问题。当端口用完时,我只需等待几分钟让操作系统释放端口,然后再做一次。我已经更新了帖子以寻求解决方法。以上是关于为啥 io_context 出现在我的 boost asio 协程服务器中的主要内容,如果未能解决你的问题,请参考以下文章
std::boost::asio::post / dispatch 使用哪个 io_context?
c++ 错误:使用已删除的函数 boost::asio::io_context::io_context
boost::asio::io_context::run_one_for() 发送大缓冲区失败
防止 boost::asio::io_context 在空投票调用时停止