如何使用超时创建 boost::async_read 和 async_write

Posted

技术标签:

【中文标题】如何使用超时创建 boost::async_read 和 async_write【英文标题】:how to create boost::async_read and async_write with timeout 【发布时间】:2021-03-10 13:44:56 【问题描述】:

我正在尝试使用boost::asio 编写服务器,但我希望boost::asio::async_read 操作在没有数据到来时超时,但我知道该怎么做。 这是我目前的代码

void do_read_header() 
    auto self(shared_from_this());
    std::cout << "do_read_header\n";
    boost::asio::async_read(
        socket_, boost::asio::buffer(res.data(), res.header_length),
        [this, self](boost::system::error_code ec,
                     std::size_t length) 
            if (!ec && res.decode_header()) 
                do_read_body();
            
        );
    do_write();


void do_read_body() 
    auto self(shared_from_this());
    Message msg;
    std::cout << "do_read_body\n";
    boost::asio::async_read(
        socket_, boost::asio::buffer(res.body(), res.body_length()),
        [this, self](boost::system::error_code ec,
                     std::size_t length) 
            if (!length) 
                return;
            
            if (!ec) 
                try 
                    std::cout << "read " << res.body() << "\n";
                    request_queue_.send(res.body(), res.body_length(),
                                        0);
                 catch (const std::exception& ex) 
                    std::cout << ex.what() << "\n";
                
             else 
                if (ec) 
                    std::cerr << "read error:" << ec.value()
                              << " message: " << ec.message() << "\n";
                
                socket_.close();
            
            do_read_header();
        );


void start() 
    post(strand_, [this, self = shared_from_this()] 
        do_read_header();
        do_write();
    );


class Server 
  public:
    Server(boost::asio::io_service& io_service, short port)
        : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
          socket_(io_service) 
        do_accept();
    

  private:
    void do_accept() 
        acceptor_.async_accept(
            socket_, [this](boost::system::error_code ec) 
                if (!ec) 
                    std::cout << "accept connection\n";

                    std::make_shared<Session>(std::move(socket_))
                        ->start();
                

                do_accept();
            );
    

    tcp::acceptor acceptor_;
    tcp::socket socket_;
;

【问题讨论】:

取决于你想在超时后做什么。是否要关闭套接字? 不,我想转到do_write() 函数 听起来你想要一个deadline timer and async_wait。然后根据哪个处理程序采取适当的行动——计时器或读取操作。 -- 首先被调用。 您可能应该只发布自包含代码。我有一种非常独特的感觉,我以前看过这段代码,但我又花了很多时间让它完整。如果我可以通过猜测来做到这一点,也许你应该能够做到:) 【参考方案1】:

您可以添加一个截止时间计时器来取消 IO 操作。您可以观察到取消,因为将使用error::operation_aborted 调用完成。

deadline_.expires_from_now(1s);
deadline_.async_wait([self, this] (error_code ec) 
    if (!ec) socket_.cancel();
);

我花了大约 45 分钟使您的其余代码独立:

在这个例子中,我假设我们 希望最多等待 5 秒以等待新标头到达(因此在新会话开始后或直到下一个请求在同一会话中到达) 之后必须在 1 秒内收到全身

还请注意,我们避免关闭套接字 - 这是在会话的析构函数中完成的。最好优雅地关机。

Live Demo

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>

using namespace std::chrono_literals;
namespace bip = boost::interprocess;
using boost::asio::ip::tcp;
using boost::system::error_code;
using Queue = boost::interprocess::message_queue;

static constexpr auto MAX_MESG_LEN = 100;
static constexpr auto MAX_MESGS = 10;

struct Message 
    using Len = boost::endian::big_uint32_t;

    struct header_t 
        Len len;
    ;
    static const auto header_length = sizeof(header_t);
    std::array<char, MAX_MESG_LEN + header_length> buf;

    char const* data() const  return buf.data();              
    char*       data()        return buf.data();              
    char const* body() const  return data() + header_length;  
    char*       body()        return data() + header_length; 

    static_assert(std::is_standard_layout_v<header_t> and
                  std::is_trivial_v<header_t>);

    Len body_length() const      return std::min(h().len, max_body_length());  
    Len max_body_length() const  return buf.max_size() - header_length;        
    bool decode_header()         return h().len <= max_body_length();          

    bool set_body(std::string_view value) 
        assert(value.length() <= max_body_length());
        h().len = value.length();
        std::copy_n(value.begin(), body_length(), body());

        return (value.length() == body_length()); // not truncated
    

  private:
    header_t&       h()        return *reinterpret_cast<header_t*>(data());        
    header_t const& h() const  return *reinterpret_cast<header_t const*>(data()); 
;

struct Session : std::enable_shared_from_this<Session> 
    Session(tcp::socket&& s) : socket_(std::move(s)) 

    void start() 
        post(strand_,
             [ this, self = shared_from_this() ]  do_read_header(); );
    

private:
    using Strand = boost::asio::strand<tcp::socket::executor_type>;
    using Timer  = boost::asio::steady_timer;

    tcp::socket socket_strand_;
    Strand      strand_make_strand(socket_.get_executor());
    Message     res;
    Queue       request_queue_bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN;
    Timer       recv_deadline_strand_;

    void do_read_header() 
        auto self(shared_from_this());
        std::cout << "do_read_header: " << res.header_length << std::endl;
        recv_deadline_.expires_from_now(5s);
        recv_deadline_.async_wait([ self, this ](error_code ec) 
            if (!ec) 
                std::cerr << "header timeout" << std::endl;
                socket_.cancel();
            
        );

        boost::asio::async_read(
            socket_, boost::asio::buffer(res.data(), res.header_length),
            [ this, self ](error_code ec, size_t /*length*/) 
                std::cerr << "header: " << ec.message() << std::endl;
                recv_deadline_.cancel();
                if (!ec && res.decode_header()) 
                    do_read_body();
                 else 
                    socket_.shutdown(tcp::socket::shutdown_both);
                
            );
    

    void do_read_body() 
        auto self(shared_from_this());
        // Message msg;
        std::cout << "do_read_body: " << res.body_length() << std::endl;

        recv_deadline_.expires_from_now(1s);
        recv_deadline_.async_wait([self, this] (error_code ec) 
            if (!ec) 
                std::cerr << "body timeout" << std::endl;
                socket_.cancel();
            
        );

        boost::asio::async_read(
            socket_,
            boost::asio::buffer(res.body(), res.body_length()),
            boost::asio::transfer_exactly(res.body_length()),
            [ this, self ](error_code ec, std::size_t length) 
                std::cerr << "body: " << ec.message() << std::endl;
                recv_deadline_.cancel();
                if (!ec) 
                    try 
                        // Not safe to print unless NUL-terminated, see e.g.
                        // https://***.com/questions/66278813/boost-deadline-timer-causes-stack-buffer-overflow/66279497#66279497
                        if (length)
                            request_queue_.send(res.body(), res.body_length(), 0);
                     catch (const std::exception& ex) 
                        std::cout << ex.what() << std::endl;
                    
                    do_read_header();
                 else 
                    socket_.shutdown(tcp::socket::shutdown_both);
                
            );
    
;

class Server 
  public:
    Server(boost::asio::io_service& io_service, short port)
        : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
          socket_(io_service) 
        do_accept();
    

  private:
    void do_accept() 
        acceptor_.async_accept(socket_, [ this ](error_code ec) 
            std::cerr << "async_accept: " << ec.message() << std::endl;
            if (!ec) 
                std::cerr << "session: " << socket_.remote_endpoint() << std::endl;
                std::make_shared<Session>(std::move(socket_))->start();
            

            do_accept();
        );
    

    tcp::acceptor acceptor_;
    tcp::socket   socket_;
;

int main(int argc, char**) 
    Queue queuebip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN; // ensure it exists

    if (argc == 1) 
        boost::asio::io_context ioc;
        Server s(ioc, 8989);

        ioc.run_for(10s);
     else 
        while (true) 
            using Buf = std::array<char, MAX_MESG_LEN>;
            Buf      buf;
            unsigned prio;
            size_t   n;
            queue.receive(buf.data(), buf.size(), n, prio);

            std::cout << "Received: " << std::quoted(std::string_view(buf.data(), n)) << std::endl;
        
    

可测试

./sotest

在另一个终端:

./sotest consumer

还有其他地方,例如一些不会超时的请求:

for msg in '0000 0000' '0000 0001 31' '0000 000c 6865 6c6c 6f20 776f 726c 640a'
do
    xxd -r -p <<< "$msg" |
        netcat localhost 8989 -w 1
done

或者,单个会话的多个请求,然后会话超时(-w 6 超过 5 秒):

msg='0000 0000 0000 0001 31 0000 000c 6865 6c6c 6f20 776f 726c 640a'; xxd -r -p <<< "$msg"| netcat localhost 8989 -w 6

【讨论】:

添加了一个现场演示的 GIF,因为没有在线编译器做 IPC 消息队列 非常感谢。在我的代码中,我有 alos do_write 从消息队列中读取并将 async_write 它发送给客户端 - 但是你回答我的内容也帮助我解决了它:)。【参考方案2】:

在实现串行端口包装器时,我必须解决类似的问题,这是代码的简化版本:

// Possible outcome of a read. Set by callback
enum class ReadResult

    ResultInProgress,
    ResultSuccess,
    ResultError,
    ResultTimeout
;

std::streamsize read(char* s, std::streamsize n, boost::posix_time::time_duration timeout)


    boost::asio::io_service io;
    boost::asio::serial_port port(io);
    // result is atomic to avoid race condition
    std::atomic<ReadResult> result(ReadResult::ResultInProgress); 
    std::streamsize bytesTransferred = 0;

    // Create async timer that fires after duration 
    boost::asio::deadline_timer timer(io);
    timer.expires_from_now(timeout);
    timer.async_wait(boost::bind([&result](const boost::system::error_code& error)
    // If there wasn't any error and reading is still in progress, set result as timeout        
        if (!error && result == ReadResult::ResultInProgress) 
            result = ReadResult::ResultTimeout;
    ,boost::asio::placeholders::error));

    // Read asynchronously
    port.async_read_some(boost::asio::buffer(s, n), boost::bind([&result,&bytesTransferred](const boost::system::error_code& error,
    const size_t transferred)
        // If there wasn't any error on read finish set result as sucess else as error
        if (!error)
             result = ReadResult::ResultSuccess;
             bytesTransferred = transferred;
             return;
        
        result = ReadResult::ResultError;
    ,boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

    // Run loop until timeout, error or success occurs
    for (;;)
    
        io.run_one();
        switch (result)
        
        case ReadResult::ResultSuccess:
            // Success, cancel timer and return amount of bytes read
            timer.cancel();
            return bytesTransferred;
        case ReadResult::ResultTimeout:
            // Timeout occured, cancel read and throw exception
            port.cancel();
            throw(TimeoutException("Timeout expired"));
        case ReadResult::ResultError:
            // Error occured, cancel read and timer and throw exception
            port.cancel();
            timer.cancel();
            throw(std::ios_base::failure("Error while reading"));
        default:
            //if result is still in progress remain in the loop
            break;
        
    


所以基本上,你要做的是:

    使用 io_service 初始化计时器 使用设置结果超时标志的回调函数在计时器上调用 async_wait 在您的套接字上调用 async_read 并使用设置结果成功标志的回调 循环直到结果不再是“InProgress”,注意io.run_one() in loop 很重要 处理结果结果

你可以将它用于任何异步函数

【讨论】:

wjy不是在完成async_wait时简单地取消读吗?

以上是关于如何使用超时创建 boost::async_read 和 async_write的主要内容,如果未能解决你的问题,请参考以下文章

如何正确设置浏览器创建的桌面通知的无限超时

如何创建超时

我应该如何创建一个总是超时的测试资源

如何使用django设置发送电子邮件的超时时间?

一直在使用 React,如何设置超时或创建一个类似于每隔几分钟显示来自 graphql api 的新数据的数据轮播?

如何模拟超时响应