boost asio async_write:如何不交错 async_write 调用?
Posted
技术标签:
【中文标题】boost asio async_write:如何不交错 async_write 调用?【英文标题】:boost asio async_write : how to not interleaving async_write calls? 【发布时间】:2011-12-06 23:15:52 【问题描述】:这是我的实现:
客户端 A 为客户端 B 发送消息 服务器通过async_read
处理消息适量的数据和
将等待来自客户端 A 的新数据(为了不阻止客户端 A)
之后服务器会处理信息(可能做一个mysql
查询),然后使用async_write
将消息发送给客户端B。
问题是,如果客户端 A 发送消息的速度非常快,async_writes
将在调用之前的 async_write 处理程序之前交错。
有没有简单的方法可以避免这个问题?
编辑 1: 如果客户端 C 在客户端 A 之后向客户端 B 发送消息,则应该会出现相同的问题...
编辑 2: 这行得通吗?因为好像挡住了,不知道在哪里……
namespace structure
class User
public:
User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) :
m_socket(io_service, context), m_strand(io_service), is_writing(false)
ssl_socket& getSocket()
return m_socket;
boost::asio::strand getStrand()
return m_strand;
void push(std::string str)
m_strand.post(boost::bind(&structure::User::strand_push, this, str));
void strand_push(std::string str)
std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;
m_queue.push(str);
if (!is_writing)
write();
std::cout << "going to write" << std::endl;
std::cout << "Already writing" << std::endl;
void write()
std::cout << "writing" << std::endl;
is_writing = true;
std::string str = m_queue.front();
boost::asio::async_write(m_socket,
boost::asio::buffer(str.c_str(), str.size()),
boost::bind(&structure::User::sent, this)
);
void sent()
std::cout << "sent" << std::endl;
m_queue.pop();
if (!m_queue.empty())
write();
return;
else
is_writing = false;
std::cout << "done sent" << std::endl;
private:
ssl_socket m_socket;
boost::asio::strand m_strand;
std::queue<std::string> m_queue;
bool is_writing;
;
#endif
【问题讨论】:
请注意,异步写入的价值远低于异步读取。大多数写入实际上是即时的,因为操作系统将在本地缓冲数据。另一方面,读取可能会阻止等待远程端,而您在本地对此无能为力。因此,同步写入是实现排序的一种可行方式。这也解决了数据所有权的问题——上面的代码是不正确的,因为str
在write()
返回时被销毁,这可能在boost::asio_async_write()
访问缓冲区之前。
【参考方案1】:
有没有简单的方法可以避免这个问题?
是的,为每个客户端维护一个传出队列。检查async_write
完成处理程序中的队列大小,如果非零,则启动另一个async_write
操作。这是一个示例
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <deque>
#include <iostream>
#include <string>
class Connection
public:
Connection(
boost::asio::io_service& io_service
) :
_io_service( io_service ),
_strand( _io_service ),
_socket( _io_service ),
_outbox()
void write(
const std::string& message
)
_strand.post(
boost::bind(
&Connection::writeImpl,
this,
message
)
);
private:
void writeImpl(
const std::string& message
)
_outbox.push_back( message );
if ( _outbox.size() > 1 )
// outstanding async_write
return;
this->write();
void write()
const std::string& message = _outbox[0];
boost::asio::async_write(
_socket,
boost::asio::buffer( message.c_str(), message.size() ),
_strand.wrap(
boost::bind(
&Connection::writeHandler,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
)
);
void writeHandler(
const boost::system::error_code& error,
const size_t bytesTransferred
)
_outbox.pop_front();
if ( error )
std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
return;
if ( !_outbox.empty() )
// more messages to send
this->write();
private:
typedef std::deque<std::string> Outbox;
private:
boost::asio::io_service& _io_service;
boost::asio::io_service::strand _strand;
boost::asio::ip::tcp::socket _socket;
Outbox _outbox;
;
int
main()
boost::asio::io_service io_service;
Connection foo( io_service );
一些关键点
boost::asio::io_service::strand
保护对Connection::_outbox
的访问
处理程序是从Connection::write()
调度的,因为它是公开的
如果您在问题的示例中使用类似的做法,对我来说并不明显,因为所有方法都是公开的。
【讨论】:
我已经尝试过这个解决方案,事情是我有一个带有多个线程运行 run() 的 io_service,甚至使用 strand.post 将数据推送到队列上似乎是段错误,因为它被称为来自 2 个不同的线程...知道为什么吗? @TheSquad 这对我来说听起来像是一个单独的问题。您可能错误地实现了您的逻辑,使用链和多线程可以很容易地做到这一点。对于您的原始问题,使用队列是一个合适的解决方案。 你会用什么来知道什么时候应该从队列中弹出数据? @AllanBazinet 阅读了boost::bind
的文档,值得注意的是,这句话:bind 接受的参数由返回的函数对象在内部复制和保存
@SamMiller 因为 'message' 被 boost::bind 返回的函数对象复制并保存,并且在 strand::post 期间“该链将根据需要制作处理程序对象的副本”,那么双端队列'_outbox'真的有必要吗?链本身不是队列吗?【参考方案2】:
只是想改进 Sam 的出色答案。改进点是:
async_write
努力在完成之前从缓冲区发送每个字节,这意味着您应该提供您拥有的所有输入数据 写入操作,否则由于 TCP 数据包比本来的小,帧开销可能会增加。
asio::streambuf
虽然使用起来非常方便,但不是零拷贝。下面的示例演示了一种 零复制 方法:将输入数据块保留在它们所在的位置,并使用 async_write
的分散/聚集重载来接收一系列输入缓冲区(只是指向实际输入数据的指针)。
完整源代码:
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>
using namespace std::chrono_literals;
using boost::asio::ip::tcp;
class Server
class Connection : public std::enable_shared_from_this<Connection>
friend class Server;
void ProcessCommand(const std::string& cmd)
if (cmd == "stop")
server_.Stop();
return;
if (cmd == "")
Close();
return;
std::thread t([this, self = shared_from_this(), cmd]
for (int i = 0; i < 30; ++i)
Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n");
server_.io_service_.post([this, self]
DoReadCmd();
);
);
t.detach();
void DoReadCmd()
read_timer_.expires_from_now(server_.read_timeout_);
read_timer_.async_wait([this](boost::system::error_code ec)
if (!ec)
std::cout << "Read timeout\n";
Shutdown();
);
boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read)
read_timer_.cancel();
if (!ec)
const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data());
std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1));
buf_in_.consume(bytes_read);
ProcessCommand(cmd);
else
Close();
);
void DoWrite()
active_buffer_ ^= 1; // switch buffers
for (const auto& data : buffers_[active_buffer_])
buffer_seq_.push_back(boost::asio::buffer(data));
write_timer_.expires_from_now(server_.write_timeout_);
write_timer_.async_wait([this](boost::system::error_code ec)
if (!ec)
std::cout << "Write timeout\n";
Shutdown();
);
boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred)
write_timer_.cancel();
std::lock_guard<std::mutex> lock(buffers_mtx_);
buffers_[active_buffer_].clear();
buffer_seq_.clear();
if (!ec)
std::cout << "Wrote " << bytes_transferred << " bytes\n";
if (!buffers_[active_buffer_ ^ 1].empty()) // have more work
DoWrite();
else
Close();
);
bool Writing() const return !buffer_seq_.empty();
Server& server_;
boost::asio::streambuf buf_in_;
std::mutex buffers_mtx_;
std::vector<std::string> buffers_[2]; // a double buffer
std::vector<boost::asio::const_buffer> buffer_seq_;
int active_buffer_ = 0;
bool closing_ = false;
bool closed_ = false;
boost::asio::deadline_timer read_timer_, write_timer_;
tcp::socket socket_;
public:
Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_)
void Start()
socket_.set_option(tcp::no_delay(true));
DoReadCmd();
void Close()
closing_ = true;
if (!Writing())
Shutdown();
void Shutdown()
if (!closed_)
closing_ = closed_ = true;
boost::system::error_code ec;
socket_.shutdown(tcp::socket::shutdown_both, ec);
socket_.close();
server_.active_connections_.erase(shared_from_this());
void Write(std::string&& data)
std::lock_guard<std::mutex> lock(buffers_mtx_);
buffers_[active_buffer_ ^ 1].push_back(std::move(data)); // move input data to the inactive buffer
if (!Writing())
DoWrite();
;
void DoAccept()
if (acceptor_.is_open())
auto session = std::make_shared<Connection>(*this);
acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec)
if (!ec)
active_connections_.insert(session);
session->Start();
DoAccept();
);
boost::asio::io_service io_service_;
tcp::acceptor acceptor_;
std::unordered_set<std::shared_ptr<Connection>> active_connections_;
const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30);
const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30);
public:
Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false)
void Run()
std::cout << "Listening on " << acceptor_.local_endpoint() << "\n";
DoAccept();
io_service_.run();
void Stop()
acceptor_.close();
std::vector<std::shared_ptr<Connection>> sessionsToClose;
copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose));
for (auto& s : sessionsToClose)
s->Shutdown();
active_connections_.clear();
io_service_.stop();
;
int main()
try
Server srv(8888);
srv.Run();
catch (const std::exception& e)
std::cerr << "Error: " << e.what() << "\n";
【讨论】:
以上是关于boost asio async_write:如何不交错 async_write 调用?的主要内容,如果未能解决你的问题,请参考以下文章
为啥 boost::asio::async_write 第一次运行良好,第二次出现问题?
我应该如何在 boost::asio 的客户端应用程序中同时使用 async_read_until 和 async_write?
boost::asio::async_write 写入 ssl::stream 成功但服务器未获取