boost::asio::async_write 写入 ssl::stream 成功但服务器未获取
Posted
技术标签:
【中文标题】boost::asio::async_write 写入 ssl::stream 成功但服务器未获取【英文标题】:boost::asio::async_write write ssl::stream succuess but server not get 【发布时间】:2021-08-26 05:14:26 【问题描述】:我编写了一个 ssl 服务器和客户端运行一个 pingpang 进程,过了一会儿,客户端说发送数据成功但服务器没有收到它。 客户端在多线程中运行,当单线程时,看起来很正常。 我尝试添加计时器来添加握手,然后服务器可以获取所有数据,但我希望它可以在没有握手的情况下正常运行 任何人都可以帮助找出问题所在。
这是我的服务器
#include <cstdlib>
#include <functional>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind/bind.hpp>
using boost::asio::ip::tcp;
class session : public std::enable_shared_from_this<session>
public:
session(tcp::socket socket, boost::asio::ssl::context &context)
: socket_(std::move(socket), context), m_strand(socket.get_executor())
void start()
do_handshake();
private:
void do_handshake()
auto self(shared_from_this());
socket_.async_handshake(boost::asio::ssl::stream_base::server,
[this, self](const boost::system::error_code &error)
if (!error)
do_read();
);
void do_read()
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_),
[this, self](const boost::system::error_code &ec, std::size_t length)
if (!ec)
std::cout << "get <";
std::cout.write(data_, length);
std::cout << std::endl;
do_write(length);
);
void do_write(std::size_t length)
auto self(shared_from_this());
std::cout << "send <";
std::cout.write(data_, length);
std::cout << std::endl;
boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
[this, self](const boost::system::error_code &ec,
std::size_t /*length*/)
if (!ec)
do_read();
);
boost::asio::ssl::stream<tcp::socket> socket_;
boost::asio::strand<boost::asio::ip::tcp::socket::executor_type> m_strand;
char data_[1024];
;
class server
public:
server(boost::asio::io_context &io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
context_(boost::asio::ssl::context::sslv23)
context_.set_options(
boost::asio::ssl::context::default_workarounds
| boost::asio::ssl::context::no_sslv2
| boost::asio::ssl::context::single_dh_use);
context_.set_password_callback(std::bind(&server::get_password, this));
context_.use_certificate_chain_file("server.pem");
context_.use_private_key_file("server.pem", boost::asio::ssl::context::pem);
context_.use_tmp_dh_file("dh2048.pem");
do_accept();
private:
std::string get_password() const
return "test";
void do_accept()
acceptor_.async_accept(
[this](const boost::system::error_code &error, tcp::socket socket)
if (!error)
std::make_shared<session>(std::move(socket), context_)->start();
do_accept();
);
tcp::acceptor acceptor_;
boost::asio::ssl::context context_;
;
int main(int argc, char *argv[])
try
if (argc != 2)
std::cerr << "Usage: server <port>\n";
return 1;
boost::asio::io_context io_context;
using namespace std; // For atoi.
server s(io_context, atoi(argv[1]));
io_context.run();
catch (std::exception &e)
std::cerr << "Exception: " << e.what() << "\n";
return 0;
和下一个客户
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/thread.hpp>
using boost::asio::ip::tcp;
using std::placeholders::_1;
using std::placeholders::_2;
enum
max_length = 1024
;
class client
public:
client(boost::asio::io_context &io_context,
boost::asio::ssl::context &context,
const tcp::resolver::results_type &endpoints)
: socket_(io_context, context), strand_(io_context.get_executor())
socket_.set_verify_mode(boost::asio::ssl::verify_peer);
socket_.set_verify_callback(
std::bind(&client::verify_certificate, this, _1, _2));
connect(endpoints);
private:
bool verify_certificate(bool preverified,
boost::asio::ssl::verify_context &ctx)
char subject_name[256];
X509 *cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
std::cout << "Verifying " << subject_name << "\n";
return true;
void connect(const tcp::resolver::results_type &endpoints)
boost::asio::async_connect(socket_.lowest_layer(), endpoints,
[this](const boost::system::error_code &error,
const tcp::endpoint & /*endpoint*/)
if (!error)
handshake();
else
std::cout << "Connect failed: " << error.message() << "\n";
);
void handshake()
socket_.async_handshake(boost::asio::ssl::stream_base::client,
[this](const boost::system::error_code &error)
if (!error)
send_request("hello ssl");
boost::asio::post(strand_, std::bind(&client::recv, this));
else
std::cout << "Handshake failed: " << error.message() << "\n";
);
void send_request(const std::string &msg)
boost::asio::async_write(
socket_, boost::asio::buffer(msg),
[this](const boost::system::error_code &error, std::size_t length)
if (!error)
std::cout << "send data success, size : " << length << std::endl;
else
std::cout << "Write failed: " << error.message() << std::endl;
);
void recv()
boost::asio::async_read(
socket_, buffer_, boost::asio::transfer_exactly(9),
boost::asio::bind_executor(
strand_, [this](const boost::system::error_code &error, std::size_t length)
if (!error)
std::istream buffer(&buffer_);
std::vector<char> msg(length, 0);
buffer.readsome(msg.data(), length);
std::string recvMsg(msg.begin(), msg.end());
std::cout << recvMsg << std::endl;
send_request(recvMsg);
boost::asio::post(strand_, std::bind(&client::recv, this));
else
std::cout << "Read failed: " << error.message() << std::endl;
));
boost::asio::ssl::stream<tcp::socket> socket_;
boost::asio::streambuf buffer_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
;
int main(int argc, char *argv[])
try
if (argc != 3)
std::cerr << "Usage: client <host> <port>\n";
return 1;
boost::asio::io_context io_context;
tcp::resolver resolver(io_context);
auto endpoints = resolver.resolve(argv[1], argv[2]);
boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
ctx.load_verify_file("ca.pem");
client c(io_context, ctx, endpoints);
boost::thread_group threadPool;
for (size_t i = 0; i < boost::thread::hardware_concurrency(); ++i)
threadPool.create_thread(boost::bind(&boost::asio::io_context::run, &io_context));
io_context.run();
catch (std::exception &e)
std::cerr << "Exception: " << e.what() << "\n";
return 0;
当数据没有发送到服务器时,客户端会这样打印
hello ssl
hello ssl
send data success, size : 9
send data success, size : 9
【问题讨论】:
大部分代码来自 boost 示例 【参考方案1】:看看这个。如果您删除 thread_group (据我所知,它没有增加任何价值),一切正常。这是您遇到线程错误的好兆头。
在看到问题之前,我没心情看代码,所以让我们圈一下。
添加 ASAN/UBSAN 不会立即显示出任何问题,这很好。
那么让我看一下代码。
session
创建一个 m_strand
- 从未使用过...
你忘了加入额外的线程
现在我注意到链周围存在一些潜在的混淆,我查看了客户端链的使用。并看到它是不一致的:
插座本身不在链上send_request
不运行在 也不 将完成处理程序绑定到 strand 的执行程序
通信是全双工的(意味着async_write
和async_read
同时发生)。
这意味着client::recv
被发布到链上,它实际上并没有同步对socket_
的线程访问(因为send_request
首先没有绑定到链上)
如果上述情况令人惊讶,那么您不是第一个陷入这种情况的人: Why is `net::dispatch` needed when the I/O object already has an executor?。在您的示例中,
connect()
和handshake()
可以被认为是安全的,因为它们形成了一个逻辑链(顺序执行流)。问题出在并发路径上。
到目前为止,解决这种情况的最简单方法似乎是从strand_
构造socket_
。这意味着重新排序成员,以便首先初始化 strand_
:
client(boost::asio::io_context& io_context, ssl::context& context,
const tcp::resolver::results_type& endpoints)
: strand_(io_context.get_executor())
, socket_(strand_, context)
接下来,可以删除链中的所有post
s,因为它们总是发生在该链上的完成处理程序中。
send_request("hello ssl");
recv(); // already on the strand in this completion handler
具有轻微讽刺意味的是,send_request
是在隐含的假设下执行的。
到目前为止清理的程序是
文件client.cpp
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/thread.hpp>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
using boost::asio::ip::tcp;
using boost::system::error_code;
using std::placeholders::_1;
using std::placeholders::_2;
namespace ssl = boost::asio::ssl;
class client
public:
client(boost::asio::io_context& io_context, ssl::context& context,
const tcp::resolver::results_type& endpoints)
: strand_(io_context.get_executor())
, socket_(strand_, context)
socket_.set_verify_mode(ssl::verify_peer);
socket_.set_verify_callback(
std::bind(&client::verify_certificate, this, _1, _2));
connect(endpoints);
private:
bool verify_certificate(bool preverified, ssl::verify_context& ctx)
char subject_name[256];
X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
std::cout << "Verifying " << subject_name << "\n";
return true;
void connect(const tcp::resolver::results_type& endpoints)
async_connect( //
socket_.lowest_layer(), endpoints,
bind_executor(
strand_, [this](error_code error, const tcp::endpoint&)
if (!error)
handshake();
else
std::cout << "Connect failed: " << error.message() << "\n";
));
void handshake()
socket_.async_handshake(
ssl::stream_base::client,
bind_executor(strand_, [this](error_code error)
if (!error)
send_request("hello ssl");
recv(); // already on the strand in this completion handler
else
std::cout << "Handshake failed: " << error.message()
<< "\n";
));
void send_request(std::string const& msg)
msg_ = msg;
async_write(
socket_, boost::asio::buffer(msg_),
bind_executor(
strand_, [/*this*/](error_code error, std::size_t length)
if (!error)
std::cout << "send data success, size : " << length << std::endl;
else
std::cout << "Write failed: " << error.message() << std::endl;
));
void recv()
async_read(
socket_, buffer_, boost::asio::transfer_exactly(9),
boost::asio::bind_executor(
strand_, [this](error_code error, std::size_t length)
if (!error)
std::istream buffer(&buffer_);
std::vector<char> msg(length, 0);
buffer.readsome(msg.data(), length);
msg_.assign(msg.begin(), msg.end());
std::cout << msg_ << std::endl;
send_request(msg_);
recv(); // already on the strand in this completion handler
else
std::cout << "Read failed: " << error.message() << std::endl;
));
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
ssl::stream<tcp::socket> socket_;
boost::asio::streambuf buffer_;
std::string msg_;
;
int main(int argc, char* argv[])
try
if (argc != 3)
std::cerr << "Usage: client <host> <port>\n";
return 1;
boost::asio::io_context io_context;
tcp::resolver resolver(io_context);
auto endpoints = resolver.resolve(argv[1], argv[2]);
ssl::context ctx(ssl::context::sslv23);
ctx.load_verify_file("ca.pem");
client c(io_context, ctx, endpoints);
boost::thread_group threadPool;
for (size_t i = 0; i < boost::thread::hardware_concurrency(); ++i)
threadPool.create_thread(
boost::bind(&boost::asio::io_context::run, &io_context));
threadPool.join_all();
//io_context.run();
return 0;
catch (std::exception const& e)
std::cerr << "Exception: " << e.what() << "\n";
return 1;
文件server.cpp
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind/bind.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
namespace ssl = boost::asio::ssl;
using boost::asio::ip::tcp;
using boost::system::error_code;
class session : public std::enable_shared_from_this<session>
public:
session(tcp::socket socket, ssl::context& context)
: socket_(std::move(socket), context)
, m_strand(socket.get_executor())
void start()
do_handshake();
private:
void do_handshake()
auto self(shared_from_this());
socket_.async_handshake(ssl::stream_base::server,
[this, self](error_code error)
if (!error)
do_read();
);
void do_read()
auto self(shared_from_this());
socket_.async_read_some(
boost::asio::buffer(data_),
[this, self](error_code ec, std::size_t length)
if (!ec)
std::cout << "get <";
std::cout.write(data_, length);
std::cout << std::endl;
do_write(length);
);
void do_write(std::size_t length)
auto self(shared_from_this());
std::cout << "send <";
std::cout.write(data_, length);
std::cout << std::endl;
boost::asio::async_write(
socket_, boost::asio::buffer(data_, length),
[this, self](error_code ec, std::size_t /*length*/)
if (!ec)
do_read();
);
ssl::stream<tcp::socket> socket_;
boost::asio::strand<tcp::socket::executor_type> m_strand;
char data_[1024];
;
class server
public:
server(boost::asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
, context_(ssl::context::sslv23)
context_.set_options(ssl::context::default_workarounds |
ssl::context::no_sslv2 |
ssl::context::single_dh_use);
context_.set_password_callback(std::bind(&server::get_password, this));
context_.use_certificate_chain_file("server.pem");
context_.use_private_key_file("server.pem", ssl::context::pem);
context_.use_tmp_dh_file("dh2048.pem");
do_accept();
private:
std::string get_password() const
return "test";
void do_accept()
acceptor_.async_accept([this](error_code error, tcp::socket socket)
if (!error)
std::make_shared<session>(std::move(socket), context_)->start();
do_accept();
);
tcp::acceptor acceptor_;
ssl::context context_;
;
int main(int argc, char* argv[])
try
if (argc != 2)
std::cerr << "Usage: server <port>\n";
return 1;
boost::asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
return 0;
catch (std::exception const& e)
std::cerr << "Exception: " << e.what() << "\n";
return 1;
其他问题
生命周期错误
UBSAN/ASAN 没有发现,但这是错误的:
void send_request(const std::string& msg)
async_write(
socket_, boost::asio::buffer(msg),
...
问题在于msg
的生命周期,它在异步操作有机会运行之前就消失了,更不用说完成了。所以,移动缓冲区,使生命周期足够长(例如成员msg_
)。
并发写入
当客户端锁定时显示
send data success, size : 9
hello ssl
hello ssl
send data success, size : 9
send data success, size : 9
这表示在启动发送之前接收到第二个hello ssl
。这意味着启动了第二次发送。在幕后,这取消了 ssl 流上下文中的双工同步对象。你可以通过-DBOOST_ASIO_ENABLE_HANDLER_TRACKING
看到这个:
@asio|1630155694.209267|51139|deadline_timer@0x7ffc6fa61e48.cancel
使用handlerviz.pl
脚本进行可视化:
问题是违反the requirements here:
程序必须确保流不执行其他写入操作(例如 async_write、流的 async_write_some 函数或执行写入的任何其他组合操作),直到此操作完成。
两种简单的修复方法:
-
将 IO 从全双工更改为顺序读/写/读/写,就像服务器一样
创建一个输出队列,其中包含仍要按顺序写入的消息
固定解决方案
这使用发件箱,就像上面的第二种解决方案一样,用于重叠写入。我也冒昧
删除不必要的中间缓冲区streambuf buffer_
,而不是直接读入字符串。
将io_context
+ thread_group
替换为更优雅的thread_pool
许多小的改进(上面提到的一些)
文件client.cpp
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/thread.hpp>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
using boost::asio::ip::tcp;
using boost::system::error_code;
using std::placeholders::_1;
using std::placeholders::_2;
namespace ssl = boost::asio::ssl;
using Executor = boost::asio::thread_pool::executor_type;
class client
public:
client(Executor ex, ssl::context& context,
const tcp::resolver::results_type& endpoints)
: strand_(ex)
, socket_(strand_, context)
socket_.set_verify_mode(ssl::verify_peer);
socket_.set_verify_callback(
std::bind(&client::verify_certificate, this, _1, _2));
connect(endpoints);
private:
bool verify_certificate(bool preverified, ssl::verify_context& ctx)
char subject_name[256];
X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
std::cout << "Verifying " << subject_name << "\n";
return true;
void connect(const tcp::resolver::results_type& endpoints)
async_connect( //
socket_.lowest_layer(), endpoints,
bind_executor(
strand_, [this](error_code error, const tcp::endpoint&)
if (!error)
handshake();
else
std::cout << "Connect failed: " << error.message() << "\n";
));
void handshake()
socket_.async_handshake(
ssl::stream_base::client,
bind_executor(strand_, [this](error_code error)
if (!error)
send_request("hello ssl");
recv(); // already on the strand in this completion handler
else
std::cout << "Handshake failed: " << error.message()
<< "\n";
));
void send_request(std::string msg)
outbox_.push_back(std::move(msg));
if (outbox_.size() == 1)
send_loop();
void send_loop()
async_write( //
socket_, boost::asio::buffer(outbox_.back()),
bind_executor(
strand_, [this](error_code error, std::size_t length)
if (!error)
std::cout << "send data success, size : " << length << std::endl;
outbox_.pop_back();
else
std::cout << "Write failed: " << error.message() << std::endl;
if (!outbox_.empty())
send_loop();
));
void recv()
async_read(
socket_, boost::asio::dynamic_buffer(buffer_), boost::asio::transfer_exactly(9),
boost::asio::bind_executor(
strand_, [this](error_code error, std::size_t length)
if (!error)
std::cout << buffer_ << std::endl;
send_request(std::move(buffer_));
recv(); // already on the strand in this completion handler
else
std::cout << "Read failed: " << error.message() << std::endl;
));
boost::asio::strand<Executor> strand_;
ssl::stream<tcp::socket> socket_;
std::string buffer_;
std::deque<std::string> outbox_;
;
int main(int argc, char* argv[])
try
if (argc != 3)
std::cerr << "Usage: client <host> <port>\n";
return 1;
ssl::context ctx(ssl::context::sslv23);
ctx.load_verify_file("ca.pem");
boost::asio::thread_pool io;
tcp::resolver resolver(io);
client c(io.get_executor(), ctx, resolver.resolve(argv[1], argv[2]));
io.join();
return 0;
catch (std::exception const& e)
std::cerr << "Exception: " << e.what() << "\n";
return 1;
文件server.cpp
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind/bind.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
namespace ssl = boost::asio::ssl;
using boost::asio::ip::tcp;
using boost::system::error_code;
class session : public std::enable_shared_from_this<session>
public:
session(tcp::socket socket, ssl::context& context)
: socket_(std::move(socket), context)
void start()
do_handshake();
private:
void do_handshake()
auto self(shared_from_this());
socket_.async_handshake(ssl::stream_base::server,
[this, self](error_code error)
if (!error)
do_read();
);
void do_read()
auto self(shared_from_this());
socket_.async_read_some(
boost::asio::buffer(data_),
[this, self](error_code ec, std::size_t length)
if (!ec)
std::cout << "get <";
std::cout.write(data_.data(), length);
std::cout << std::endl;
do_write(length);
);
void do_write(std::size_t length)
auto self(shared_from_this());
std::cout << "send <";
std::cout.write(data_.data(), length);
std::cout << std::endl;
boost::asio::async_write(
socket_, boost::asio::buffer(data_.data(), length),
[this, self](error_code ec, std::size_t /*length*/)
if (!ec)
do_read();
);
ssl::stream<tcp::socket> socket_;
std::array<char, 1024> data_;
;
class server
public:
server(boost::asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
, context_(ssl::context::sslv23)
context_.set_options(ssl::context::default_workarounds |
ssl::context::no_sslv2 |
ssl::context::single_dh_use);
context_.set_password_callback(std::bind(&server::get_password, this));
context_.use_certificate_chain_file("server.pem");
context_.use_private_key_file("server.pem", ssl::context::pem);
context_.use_tmp_dh_file("dh2048.pem");
do_accept();
private:
std::string get_password() const
return "test";
void do_accept()
acceptor_.async_accept([this](error_code error, tcp::socket socket)
if (!error)
std::make_shared<session>(std::move(socket), context_)->start();
do_accept();
);
tcp::acceptor acceptor_;
ssl::context context_;
;
int main(int argc, char* argv[])
try
if (argc != 2)
std::cerr << "Usage: server <port>\n";
return 1;
boost::asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
return 0;
catch (std::exception const& e)
std::cerr << "Exception: " << e.what() << "\n";
return 1;
现场演示:
如您所见(使用uniq -dc
技巧来抑制所有非重复行)现在它可以在启动发送之前进入多个接收的情况下愉快地继续。
【讨论】:
请注意,这是一篇很长的文章。一定要走到最后,因为下半场发现的问题很重要。 无论是解决方案还是分析过程都非常有帮助,谢谢以上是关于boost::asio::async_write 写入 ssl::stream 成功但服务器未获取的主要内容,如果未能解决你的问题,请参考以下文章
boost asio async_write:如何不交错 async_write 调用?