如何使用超时创建 boost::async_read 和 async_write
Posted
技术标签:
【中文标题】如何使用超时创建 boost::async_read 和 async_write【英文标题】:how to create boost::async_read and async_write with timeout 【发布时间】:2021-03-10 13:44:56 【问题描述】:我正在尝试使用boost::asio
编写服务器,但我希望boost::asio::async_read
操作在没有数据到来时超时,但我知道该怎么做。
这是我目前的代码
void do_read_header()
auto self(shared_from_this());
std::cout << "do_read_header\n";
boost::asio::async_read(
socket_, boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec,
std::size_t length)
if (!ec && res.decode_header())
do_read_body();
);
do_write();
void do_read_body()
auto self(shared_from_this());
Message msg;
std::cout << "do_read_body\n";
boost::asio::async_read(
socket_, boost::asio::buffer(res.body(), res.body_length()),
[this, self](boost::system::error_code ec,
std::size_t length)
if (!length)
return;
if (!ec)
try
std::cout << "read " << res.body() << "\n";
request_queue_.send(res.body(), res.body_length(),
0);
catch (const std::exception& ex)
std::cout << ex.what() << "\n";
else
if (ec)
std::cerr << "read error:" << ec.value()
<< " message: " << ec.message() << "\n";
socket_.close();
do_read_header();
);
void start()
post(strand_, [this, self = shared_from_this()]
do_read_header();
do_write();
);
class Server
public:
Server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service)
do_accept();
private:
void do_accept()
acceptor_.async_accept(
socket_, [this](boost::system::error_code ec)
if (!ec)
std::cout << "accept connection\n";
std::make_shared<Session>(std::move(socket_))
->start();
do_accept();
);
tcp::acceptor acceptor_;
tcp::socket socket_;
;
【问题讨论】:
取决于你想在超时后做什么。是否要关闭套接字? 不,我想转到do_write()
函数
听起来你想要一个deadline timer and async_wait
。然后根据哪个处理程序采取适当的行动——计时器或读取操作。 -- 首先被调用。
您可能应该只发布自包含代码。我有一种非常独特的感觉,我以前看过这段代码,但我又花了很多时间让它完整。如果我可以通过猜测来做到这一点,也许你应该能够做到:)
【参考方案1】:
您可以添加一个截止时间计时器来取消 IO 操作。您可以观察到取消,因为将使用error::operation_aborted
调用完成。
deadline_.expires_from_now(1s);
deadline_.async_wait([self, this] (error_code ec)
if (!ec) socket_.cancel();
);
我花了大约 45 分钟使您的其余代码独立:
在这个例子中,我假设我们 希望最多等待 5 秒以等待新标头到达(因此在新会话开始后或直到下一个请求在同一会话中到达) 之后必须在 1 秒内收到全身还请注意,我们避免关闭套接字 - 这是在会话的析构函数中完成的。最好优雅地关机。
Live Demo
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
namespace bip = boost::interprocess;
using boost::asio::ip::tcp;
using boost::system::error_code;
using Queue = boost::interprocess::message_queue;
static constexpr auto MAX_MESG_LEN = 100;
static constexpr auto MAX_MESGS = 10;
struct Message
using Len = boost::endian::big_uint32_t;
struct header_t
Len len;
;
static const auto header_length = sizeof(header_t);
std::array<char, MAX_MESG_LEN + header_length> buf;
char const* data() const return buf.data();
char* data() return buf.data();
char const* body() const return data() + header_length;
char* body() return data() + header_length;
static_assert(std::is_standard_layout_v<header_t> and
std::is_trivial_v<header_t>);
Len body_length() const return std::min(h().len, max_body_length());
Len max_body_length() const return buf.max_size() - header_length;
bool decode_header() return h().len <= max_body_length();
bool set_body(std::string_view value)
assert(value.length() <= max_body_length());
h().len = value.length();
std::copy_n(value.begin(), body_length(), body());
return (value.length() == body_length()); // not truncated
private:
header_t& h() return *reinterpret_cast<header_t*>(data());
header_t const& h() const return *reinterpret_cast<header_t const*>(data());
;
struct Session : std::enable_shared_from_this<Session>
Session(tcp::socket&& s) : socket_(std::move(s))
void start()
post(strand_,
[ this, self = shared_from_this() ] do_read_header(); );
private:
using Strand = boost::asio::strand<tcp::socket::executor_type>;
using Timer = boost::asio::steady_timer;
tcp::socket socket_strand_;
Strand strand_make_strand(socket_.get_executor());
Message res;
Queue request_queue_bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN;
Timer recv_deadline_strand_;
void do_read_header()
auto self(shared_from_this());
std::cout << "do_read_header: " << res.header_length << std::endl;
recv_deadline_.expires_from_now(5s);
recv_deadline_.async_wait([ self, this ](error_code ec)
if (!ec)
std::cerr << "header timeout" << std::endl;
socket_.cancel();
);
boost::asio::async_read(
socket_, boost::asio::buffer(res.data(), res.header_length),
[ this, self ](error_code ec, size_t /*length*/)
std::cerr << "header: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec && res.decode_header())
do_read_body();
else
socket_.shutdown(tcp::socket::shutdown_both);
);
void do_read_body()
auto self(shared_from_this());
// Message msg;
std::cout << "do_read_body: " << res.body_length() << std::endl;
recv_deadline_.expires_from_now(1s);
recv_deadline_.async_wait([self, this] (error_code ec)
if (!ec)
std::cerr << "body timeout" << std::endl;
socket_.cancel();
);
boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
boost::asio::transfer_exactly(res.body_length()),
[ this, self ](error_code ec, std::size_t length)
std::cerr << "body: " << ec.message() << std::endl;
recv_deadline_.cancel();
if (!ec)
try
// Not safe to print unless NUL-terminated, see e.g.
// https://***.com/questions/66278813/boost-deadline-timer-causes-stack-buffer-overflow/66279497#66279497
if (length)
request_queue_.send(res.body(), res.body_length(), 0);
catch (const std::exception& ex)
std::cout << ex.what() << std::endl;
do_read_header();
else
socket_.shutdown(tcp::socket::shutdown_both);
);
;
class Server
public:
Server(boost::asio::io_service& io_service, short port)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service)
do_accept();
private:
void do_accept()
acceptor_.async_accept(socket_, [ this ](error_code ec)
std::cerr << "async_accept: " << ec.message() << std::endl;
if (!ec)
std::cerr << "session: " << socket_.remote_endpoint() << std::endl;
std::make_shared<Session>(std::move(socket_))->start();
do_accept();
);
tcp::acceptor acceptor_;
tcp::socket socket_;
;
int main(int argc, char**)
Queue queuebip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN; // ensure it exists
if (argc == 1)
boost::asio::io_context ioc;
Server s(ioc, 8989);
ioc.run_for(10s);
else
while (true)
using Buf = std::array<char, MAX_MESG_LEN>;
Buf buf;
unsigned prio;
size_t n;
queue.receive(buf.data(), buf.size(), n, prio);
std::cout << "Received: " << std::quoted(std::string_view(buf.data(), n)) << std::endl;
可测试
./sotest
在另一个终端:
./sotest consumer
还有其他地方,例如一些不会超时的请求:
for msg in '0000 0000' '0000 0001 31' '0000 000c 6865 6c6c 6f20 776f 726c 640a'
do
xxd -r -p <<< "$msg" |
netcat localhost 8989 -w 1
done
或者,单个会话的多个请求,然后会话超时(-w 6
超过 5 秒):
msg='0000 0000 0000 0001 31 0000 000c 6865 6c6c 6f20 776f 726c 640a'; xxd -r -p <<< "$msg"| netcat localhost 8989 -w 6
【讨论】:
添加了一个现场演示的 GIF,因为没有在线编译器做 IPC 消息队列 非常感谢。在我的代码中,我有 alosdo_write
从消息队列中读取并将 async_write
它发送给客户端 - 但是你回答我的内容也帮助我解决了它:)。【参考方案2】:
在实现串行端口包装器时,我必须解决类似的问题,这是代码的简化版本:
// Possible outcome of a read. Set by callback
enum class ReadResult
ResultInProgress,
ResultSuccess,
ResultError,
ResultTimeout
;
std::streamsize read(char* s, std::streamsize n, boost::posix_time::time_duration timeout)
boost::asio::io_service io;
boost::asio::serial_port port(io);
// result is atomic to avoid race condition
std::atomic<ReadResult> result(ReadResult::ResultInProgress);
std::streamsize bytesTransferred = 0;
// Create async timer that fires after duration
boost::asio::deadline_timer timer(io);
timer.expires_from_now(timeout);
timer.async_wait(boost::bind([&result](const boost::system::error_code& error)
// If there wasn't any error and reading is still in progress, set result as timeout
if (!error && result == ReadResult::ResultInProgress)
result = ReadResult::ResultTimeout;
,boost::asio::placeholders::error));
// Read asynchronously
port.async_read_some(boost::asio::buffer(s, n), boost::bind([&result,&bytesTransferred](const boost::system::error_code& error,
const size_t transferred)
// If there wasn't any error on read finish set result as sucess else as error
if (!error)
result = ReadResult::ResultSuccess;
bytesTransferred = transferred;
return;
result = ReadResult::ResultError;
,boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
// Run loop until timeout, error or success occurs
for (;;)
io.run_one();
switch (result)
case ReadResult::ResultSuccess:
// Success, cancel timer and return amount of bytes read
timer.cancel();
return bytesTransferred;
case ReadResult::ResultTimeout:
// Timeout occured, cancel read and throw exception
port.cancel();
throw(TimeoutException("Timeout expired"));
case ReadResult::ResultError:
// Error occured, cancel read and timer and throw exception
port.cancel();
timer.cancel();
throw(std::ios_base::failure("Error while reading"));
default:
//if result is still in progress remain in the loop
break;
所以基本上,你要做的是:
-
使用 io_service 初始化计时器
使用设置结果超时标志的回调函数在计时器上调用 async_wait
在您的套接字上调用 async_read 并使用设置结果成功标志的回调
循环直到结果不再是“InProgress”,注意
io.run_one()
in loop 很重要
处理结果结果
你可以将它用于任何异步函数
【讨论】:
wjy不是在完成async_wait
时简单地取消读吗?以上是关于如何使用超时创建 boost::async_read 和 async_write的主要内容,如果未能解决你的问题,请参考以下文章