boost::async 读写在一次读取和两次写入后卡住
Posted
技术标签:
【中文标题】boost::async 读写在一次读取和两次写入后卡住【英文标题】:boost::async read write stuck after one read and two writes 【发布时间】:2021-02-18 18:26:10 【问题描述】:我试图构建一个服务器-客户端应用程序,服务器端是 c++,带有 boost::asio 异步,但是当我运行应用程序时,它在 1 次读取和两次写入后卡住了。
这是我删除 do_write
函数调用时的相关代码,我收到了我正在发送的所有 5 条消息
编辑 当我使用调试器运行时,我能够接收所有五条消息并发送所有响应
void start()
std::thread t1([this]()do_write(););
t1.detach();
std::thread t2([this]()do_read(););
t2.detach();
void do_write()
auto self(shared_from_this());
Message msg;
while(order_response_queue_.empty())
auto order_response = order_response_queue_.front();
order_response_queue_.pop();
try
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
std::memcpy(data_,msg.data(), msg.length());
//std::memcpy(static_cast<void *>(msg.data()), data_, msg.length());
catch (std::exception& e )
std::cout << e.what();
std::cout <<"write: " << msg.body() << "\n";
boost::asio::async_write(socket_, boost::asio::buffer(data_,msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
if (ec)
std::cerr << "write error:" << ec.value() << " message: " << ec.message() << "\n";
do_write();
);
void do_read()
auto self(shared_from_this());
Message msg;
socket_.async_read_some(boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t length)
if (!ec && res.decode_header())
std::string st(res.body());
boost::asio::async_read(socket_, boost::asio::buffer(res.body(), res.body_length()), [this](boost::system::error_code ec, std::size_t length)
if (!ec)
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
else
if (ec)
std::cerr << "read error:" << ec.value() << " message: "
<< ec.message() << "\n";
socket_.close();
);
do_read();
);
这里是io_service
class Server
public:
Server(boost::asio::io_service& io_service, short port,std::queue<OrderRequest> &order_request_queue,
std::queue<OrderResponse> &order_response_queue)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) , order_response_queue_(order_response_queue), order_request_queue_(order_request_queue)
do_accept();
private:
void do_accept()
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
if (!ec)
std::cout << "accept connection\n";
std::make_shared<Session>(std::move(socket_),order_request_queue_, order_response_queue_)->start();
do_accept();
);
tcp::acceptor acceptor_;
tcp::socket socket_;
std::queue<OrderResponse> &order_response_queue_;
std::queue<OrderRequest> &order_request_queue_;
;
【问题讨论】:
我们能看到更多代码吗?处理由async_write
调用的对do_write()
的后续调用的线程的代码在哪里?
@DavidSchwartz 如果我理解正确的话,start()
函数
start
函数创建一个调用do_write
然后终止的线程和一个调用do_read
然后终止的线程。处理异步操作完成的线程在哪里?!你没有?
@DavidSchwartz async_read
和 async_write
在 lemda 函数中调用自己作为处理程序
对。您将处理完成运行的线程的处理程序传递给他们。但是该线程的代码在哪里?它是在哪里创建的?当没有要处理的完成时它会做什么?您的代码创建了两个线程,这两个线程都在调用完成处理程序之前终止(并且无论如何都不调用任何完成处理程序)。 executor
/io_service
代码在哪里?
【参考方案1】:
您应该使示例独立。
Message
是什么,
data_
来自哪里,
res
来自哪里
为什么data_
与msg.length()
一起使用,
是否包括标头长度,
req_
和res
之间有什么关系
它们的寿命是多少。
为什么要使用线程异步
正在运行的执行上下文在哪里等等。
这是您给我们的评论:
您在线程中使用order_response_queue_
,没有任何锁定。这真的不行,因为 do_write
线程上没有任何东西会推送到该队列上,所以其他线程必须这样做。
说实话,我认为最好的起点是删除线程。您似乎使用它们的主要原因是:
“保持循环运行”,这已经是异步执行上下文的一个特性。但是,您已经链接到完成处理程序(例如再次调用 do_write
),所以没关系
能够“等待”队列中的消息:
while (order_response_queue_.empty())
通常的方法是在推送第一条消息时有条件地启动循环,并在队列为空时让它“停止”。无论如何,这避免了忙碌的旋转。
只需将start
替换为类似
void start()
post(strand_, [this, self = shared_from_this()]
if (!order_response_queue_.empty())
do_write();
do_read();
);
还有类似的东西
void enqueue_response(Message stuff)
post(strand_, [this, stuff, self = shared_from_this()]
order_response_queue_.push(std::move(stuff));
if (1 == order_response_queue_.size()) // not idle/already writing
do_write();
);
请注意,使用共享队列的每个操作都在逻辑链上同步,以防您使用多线程执行上下文。
原始草图
我不太明白 Serialize/Deserialize 背后与 Message 相关的想法,所以这可能行不通 - 尽管我觉得它会给你很多关于如何简化事情的线索:
Live On Compiler Explorer*
#include <thread>
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;
struct Message
struct header_t
// TODO add magic bytes
std::uint32_t body_len = 0; // network byte order
;
enum header_length = sizeof(header_t) ;
std::string _contentsheader_length;
size_t body_length() const
return length() - header_length;
void body_length(size_t n)
_contents.resize(n+header_length);
encode_header();
bool decode_header()
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
body_length(ntohl(header.body_len));
// TODO verify magic bytes
return true;
void encode_header()
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
header.body_len = htonl(_contents.length() - header_length);
std::string SerializeToString() const return _contents;
void DeserializeFromChar(char const*) // TODO IMPLEMENTATION
char *data() return _contents.data();
char *body() return data() + header_length;
char const *data() const return _contents.data();
char const *body() const return data() + header_length;
size_t length() const return _contents.size();
bool operator<(Message const &rhs) const return _contents < rhs._contents;
;
struct X : std::enable_shared_from_this<X>
std::queue<Message> order_request_queue_, order_response_queue_;
void start()
post(strand_, [this, self = shared_from_this()]
if (!order_response_queue_.empty())
do_write();
do_read();
);
void enqueue_response(Message stuff)
post(strand_, [this, stuff, self = shared_from_this()]
order_response_queue_.push(std::move(stuff));
if (1 == order_response_queue_.size()) // not idle/already writing
do_write();
);
void do_write()
auto self(shared_from_this());
auto order_response = std::move(order_response_queue_.front());
order_response_queue_.pop();
Message msg;
try
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
assert(msg.length() <= data_.size());
std::memcpy(data_.data(), msg.data(), msg.length());
// std::memcpy(static_cast<void *>(msg.data()), data_,
// msg.length());
catch (std::exception &e)
std::cout << e.what();
std::cout << "write: " << msg.body() << "\n";
boost::asio::async_write(
socket_, boost::asio::buffer(data_, msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
if (ec)
std::cerr << "write error:" << ec.value()
<< " message: " << ec.message() << "\n";
else if (!order_response_queue_.empty())
do_write();
);
Message res, req_;
void do_read()
auto self(shared_from_this());
Message msg;
socket_.async_read_some(
boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
if (!ec && res.decode_header())
std::string st(res.body());
boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
[this](boost::system::error_code ec,
std::size_t /*length*/)
if (!ec)
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
else
if (ec)
std::cerr << "read error:" << ec.value()
<< " message: " << ec.message()
<< "\n";
socket_.close();
);
do_read();
);
using Executor = boost::asio::system_executor;
boost::asio::strand<Executor> strand_ Executor ;
tcp::socket socket_ strand_ ;
std::array<char, 1024> data_;
// quick & dirty connection
X() socket_.connect(, 8989);
;
int main()
auto x = std::make_shared<X>();
x->start();
Message demo;
std::string_view greeting"Hello world!";
demo.body_length(greeting.length());
std::copy(greeting.begin(), greeting.end(), demo.body());
x->enqueue_response();
boost::asio::query(boost::asio::system_executor(),
boost::asio::execution::context)
.join();
【讨论】:
感谢您的详细回答,启动函数上的线程的原因是同时在写入和读取上工作,但根据我从您的回答中了解到的情况,这是不合格的。并且我的 IDE 无法识别 system_execute (boost 1.58) 是的,只需使用 io_service。不过那已经很旧了。 (如果你展示你的代码会有所帮助,所以我不必编造东西)。此外,您不需要 any 线程来同时读取和写入。这就是异步 IO 的本质。我有很多答案,我展示了使用 Asio 的单线程多连接全双工 TCP 服务器。具体来说,我建议您查看一些典型的全双工示例(在图书馆聊天示例中或on this site) 非常感谢,我想我解决了这个问题。以上是关于boost::async 读写在一次读取和两次写入后卡住的主要内容,如果未能解决你的问题,请参考以下文章