在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream
Posted
技术标签:
【中文标题】在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream【英文标题】:Using the same istream with a resizeable streambuf in boost::asio 【发布时间】:2016-07-06 17:07:43 【问题描述】:我正在使用 boost asio async_read() 从套接字读取数据。我有以下阅读器类的成员,它们在程序的整个生命周期中都存在:
boost::asio::streambuf recv_data;
std::istream stream_is(&recv_data);
async_read 调用如下所示:
boost::asio::async_read(ep_ptr->get_sock(), recv_data, boost::asio::transfer_exactly(n),
boost::bind(&IProtocol::handle_data_recv, &protocol,
boost::asio::placeholders::error));
我的问题是,如果我从套接字读取“n”个字节并且流缓冲区的大小小于“n”因此它会自行调整大小,会发生什么情况。我是否需要重新创建 std::istream,因为 std::istream 所持有的内部流缓冲区现在可能已被释放/解除分配?
【问题讨论】:
半相关,你可能想做: boost::bind(&IProtocol::handle_data_recv, protocol.shared_from_this(), boost::asio::placeholders::error) 或类似的强制寿命,尽管取决于程序的结构,您可能已经隐含地强制使用寿命。这当然会迫使您使用 shared_ptr 并从 enable_shared_from_this 派生 很好的观察,谢谢 - 但正如您提到的,协议对象在程序的生命周期内也存在。所有这些都是在调用 io_service.run() 之前在 main() 中创建的 【参考方案1】:不,幸运的是,绑定并不关心 recv_data 的内部可能会被重新分配,而是绑定到 recv_data 对象本身。这是我编写的下载器的工作示例,您可以看到我没有在两次读取之间重新分配缓冲区。
以同样的方式,您可以安全地共享对向量的引用,而不必关心向量的内部是否被重新分配(除非您开始直接指向向量元素的内存地址,或者在它们失效后使用迭代器. 向量的句柄仍然有效,以同样的方式,streambuf 的句柄对 istream 仍然有效,并且工作正常)。
下载.h
#ifndef _MV_DOWNLOAD_H_
#define _MV_DOWNLOAD_H_
#include <string>
#include <iostream>
#include <istream>
#include <ostream>
#include <fstream>
#include <algorithm>
#include "Network/url.h"
#include "Utility/generalUtility.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>
namespace MV
struct HttpHeader
std::string version;
int status = 0;
std::string message;
std::map<std::string, std::string> values;
std::vector<std::string> bounces;
bool success = false;
std::string errorMessage;
size_t contentLength;
HttpHeader()
HttpHeader(std::istream& response_stream)
read(response_stream);
void read(std::istream& response_stream);
;
inline std::ostream& operator<<(std::ostream& os, const HttpHeader& obj)
os << "\\/______HTTP_HEADER______\\/\nVersion [" << obj.version << "] Status [" << obj.status << "] Message [" << obj.message << "]\n";
os << "||-----------------------||\n";
for (auto&& kvp : obj.values)
os << "[" << kvp.first << "]: " << kvp.second << "\n";
os << "\n||--------Bounces--------||\n";
for (size_t i = 0; i < obj.bounces.size(); ++i)
os << i << ": " << obj.bounces[i] << "\n";
os << "/\\_______________________/\\" << std::endl;
return os;
inline std::istream& operator>>(std::istream& a_is, HttpHeader& a_obj)
a_obj.read(a_is);
return a_is;
class DownloadRequest : public std::enable_shared_from_this<DownloadRequest>
public:
static std::shared_ptr<DownloadRequest> make(const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput)
auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
result->perform(a_url);
return result;
//onComplete is called on success or error at the end of the download.
static std::shared_ptr<DownloadRequest> make(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput, std::function<void (std::shared_ptr<DownloadRequest>)> a_onComplete)
auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
result->onComplete = a_onComplete;
result->ioService = a_ioService;
result->perform(a_url);
return result;
HttpHeader& header()
return headerData;
MV::Url finalUrl()
return currentUrl;
MV::Url inputUrl()
return originalUrl;
private:
DownloadRequest(const std::shared_ptr<std::ostream> &a_streamOutput) :
streamOutput(a_streamOutput)
void perform(const MV::Url& a_url);
bool initializeSocket();
void initiateRequest(const MV::Url& a_url);
void handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
void handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
void handleWriteRequest(const boost::system::error_code& err);
void handleReadHeaders(const boost::system::error_code& err);
void handleReadContent(const boost::system::error_code& err);
void readResponseToStream()
(*streamOutput) << &(*response);
std::shared_ptr<boost::asio::io_service> ioService;
std::unique_ptr<boost::asio::ip::tcp::resolver> resolver;
std::unique_ptr<boost::asio::ip::tcp::socket> socket;
std::unique_ptr<std::istream> responseStream;
std::unique_ptr<boost::asio::streambuf> request;
std::unique_ptr<boost::asio::streambuf> response;
std::shared_ptr<std::ostream> streamOutput;
HttpHeader headerData;
MV::Url currentUrl;
MV::Url originalUrl;
std::function<void(std::shared_ptr<DownloadRequest>)> onComplete;
;
std::string DownloadString(const MV::Url& a_url);
HttpHeader DownloadFile(const MV::Url& a_url, const std::string &a_path);
void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());
void DownloadFiles(const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());
void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>(), std::function<void()> a_onAllComplete = std::function<void()>());
#endif
下载.cpp
#include "download.h"
#include <boost/filesystem.hpp>
#include <atomic>
namespace MV
void HttpHeader::read(std::istream& response_stream)
values.clear();
response_stream >> version;
std::string status_code;
response_stream >> status_code;
try
status = std::stoi(status_code);
catch (...)
status = 0;
getline_platform_agnostic(response_stream, message);
if (!message.empty() && message[0] == ' ') message = message.substr(1);
std::string header;
while (getline_platform_agnostic(response_stream, header) && !header.empty())
auto index = header.find_first_of(':');
if (index != std::string::npos && index > 0)
auto key = header.substr(0, index);
auto value = (index + 2 >= header.size()) ? "" : header.substr(index + 2);
std::transform(key.begin(), key.end(), key.begin(), [](char c) return std::tolower(c); );
values[key] = value;
if (toLower(key) == "content-length")
try
contentLength = static_cast<size_t>(stol(value));
catch (std::exception &e)
std::cerr << e.what() << std::endl;
contentLength = 0;
std::string DownloadString(const Url& a_url)
auto result = std::make_shared<std::stringstream>();
if (DownloadRequest::make(a_url, result)->header().success)
return result->str();
else
return "";
MV::HttpHeader DownloadFile(const Url& a_url, const std::string &a_path)
HttpHeader header;
boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
auto request = DownloadRequest::make(a_url, outFile);
header = request->header();
if (!header.success)
std::remove(a_path.c_str());
return header;
void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete)
boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
auto request = DownloadRequest::make(a_ioService, a_url, outFile, [a_path, a_onComplete](std::shared_ptr<DownloadRequest> a_result)
if (!a_result->header().success)
std::remove(a_path.c_str());
if (a_onComplete) a_onComplete(a_result);
);
void DownloadFiles(const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete)
auto service = std::make_shared<boost::asio::io_service>();
for (auto&& url : a_urls)
DownloadFile(service, url, a_path + boost::filesystem::path(url.path()).filename().string(), a_onComplete);
service->run();
void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete, std::function<void()> a_onAllComplete)
size_t totalFiles = a_urls.size();
for (auto&& url : a_urls)
auto counter = std::make_shared<std::atomic<size_t>>(0);
DownloadFile(a_ioService, url, a_path + boost::filesystem::path(url.path()).filename().string(), [=](std::shared_ptr<DownloadRequest> a_request)
a_onComplete(a_request);
if (++(*counter) == totalFiles)
a_onAllComplete();
);
void DownloadRequest::handleReadContent(const boost::system::error_code& err)
if (!err)
readResponseToStream();
if (onComplete) onComplete(shared_from_this());
else if (err != boost::asio::error::eof)
headerData.success = false;
headerData.errorMessage = "Download Read Content Failure: " + err.message();
std::cerr << headerData.errorMessage << std::endl;
if (onComplete) onComplete(shared_from_this());
void DownloadRequest::handleReadHeaders(const boost::system::error_code& err)
if (!err)
responseStream = std::make_unique<std::istream>(&(*response));
headerData.read(*responseStream);
headerData.success = true;
headerData.errorMessage = "";
if (headerData.status >= 300 && headerData.status < 400 && headerData.bounces.size() < 32 && headerData.values.find("location") != headerData.values.end())
headerData.bounces.push_back(currentUrl.toString());
initiateRequest(headerData.values["location"]);
else
auto amountLeftToRead = headerData.contentLength - response->size();
if (response->size() > 0)
readResponseToStream();
if (amountLeftToRead > 0)
boost::asio::async_read(*socket, *response, boost::asio::transfer_at_least(amountLeftToRead), boost::bind(&DownloadRequest::handleReadContent, shared_from_this(), boost::asio::placeholders::error));
else
if (onComplete) onComplete(shared_from_this());
else
headerData.success = false;
headerData.errorMessage = "Download Read Header Failure: " + err.message();
std::cerr << headerData.errorMessage << std::endl;
if (onComplete) onComplete(shared_from_this());
void DownloadRequest::handleWriteRequest(const boost::system::error_code& err)
if (!err)
boost::asio::async_read_until(*socket, *response, "\r\n\r\n", boost::bind(&DownloadRequest::handleReadHeaders, shared_from_this(), boost::asio::placeholders::error));
else
headerData.success = false;
headerData.errorMessage = "Download Write Failure: " + err.message();
std::cerr << headerData.errorMessage << std::endl;
if (onComplete) onComplete(shared_from_this());
void DownloadRequest::handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
if (!err)
// The connection was successful. Send the request.
boost::asio::async_write(*socket, *request, boost::bind(&DownloadRequest::handleWriteRequest, shared_from_this(), boost::asio::placeholders::error));
else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator())
// The connection failed. Try the next endpoint in the list.
socket->close();
boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
else
headerData.success = false;
headerData.errorMessage = "Download Connection Failure: " + err.message();
std::cerr << headerData.errorMessage << std::endl;
if (onComplete) onComplete(shared_from_this());
void DownloadRequest::handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
if (!err)
// Attempt a connection to the first endpoint in the list. Each endpoint
// will be tried until we successfully establish a connection.
boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
else
headerData.success = false;
headerData.errorMessage = "Download Resolve Failure: " + err.message();
std::cerr << headerData.errorMessage << std::endl;
if (onComplete) onComplete(shared_from_this());
void DownloadRequest::initiateRequest(const MV::Url& a_url)
socket->close();
currentUrl = a_url;
request = std::make_unique<boost::asio::streambuf>();
response = std::make_unique<boost::asio::streambuf>();
using boost::asio::ip::tcp;
std::ostream requestStream(&(*request));
requestStream << "GET " << a_url.pathAndQuery() << " HTTP/1.1\r\n";
requestStream << "Host: " << a_url.host() << "\r\n";
requestStream << "Accept: */*\r\n";
requestStream << "Connection: close\r\n\r\n";
tcp::resolver::query query(a_url.host(), "http");
resolver->async_resolve(query, boost::bind(&DownloadRequest::handleResolve, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator));
bool DownloadRequest::initializeSocket()
bool created = false;
if (!ioService)
ioService = std::make_shared<boost::asio::io_service>();
created = true;
resolver = std::make_unique<boost::asio::ip::tcp::resolver>(*ioService);
socket = std::make_unique<boost::asio::ip::tcp::socket>(*ioService);
return created;
void DownloadRequest::perform(const MV::Url& a_url)
originalUrl = a_url;
try
bool needToCallRun = initializeSocket();
initiateRequest(a_url);
if (needToCallRun)
ioService->run();
catch (...)
headerData.success = false;
headerData.errorMessage = "Exception thrown to top level.";
std::cerr << headerData.errorMessage << std::endl;
onComplete(shared_from_this());
generalUtility.h(反正是其中的一部分,仅供参考)
inline std::istream& getline_platform_agnostic(std::istream& is, std::string& t)
t.clear();
// The characters in the stream are read one-by-one using a std::streambuf.
// That is faster than reading them one-by-one using the std::istream.
// Code that uses streambuf this way must be guarded by a sentry object.
// The sentry object performs various tasks,
// such as thread synchronization and updating the stream state.
std::istream::sentry se(is, true);
std::streambuf* sb = is.rdbuf();
for (;;)
int c = sb->sbumpc();
switch (c)
case '\n':
return is;
case '\r':
if (sb->sgetc() == '\n')
sb->sbumpc();
return is;
case EOF:
// Also handle the case when the last line has no line ending
if (t.empty())
is.setstate(std::ios::eofbit);
return is;
default:
t += (char)c;
inline std::string toLower(std::string s)
std::transform(s.begin(), s.end(), s.begin(), [](char c) return std::tolower(c); );
return s;
url.h
修改(稍微,只是改变了一些命名方案的东西)
https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/include/Poco/URI.h
https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/src/URI.cpp
【讨论】:
以上是关于在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream的主要内容,如果未能解决你的问题,请参考以下文章
如何使具有动态内容的reactstrap模态可调整大小和可拖动?
具有可拖放可拖动和可调整大小的 Jquery 无法按预期工作
为啥 boost::asio::read 缓冲区数据大小小于读取大小?