在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream

Posted

技术标签:

【中文标题】在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream【英文标题】:Using the same istream with a resizeable streambuf in boost::asio 【发布时间】:2016-07-06 17:07:43 【问题描述】:

我正在使用 boost asio async_read() 从套接字读取数据。我有以下阅读器类的成员,它们在程序的整个生命周期中都存在:

boost::asio::streambuf recv_data;
std::istream stream_is(&recv_data);

async_read 调用如下所示:

boost::asio::async_read(ep_ptr->get_sock(), recv_data, boost::asio::transfer_exactly(n),
                              boost::bind(&IProtocol::handle_data_recv, &protocol,
                                          boost::asio::placeholders::error));

我的问题是,如果我从套接字读取“n”个字节并且流缓冲区的大小小于“n”因此它会自行调整大小,会发生什么情况。我是否需要重新创建 std::istream,因为 std::istream 所持有的内部流缓冲区现在可能已被释放/解除分配?

【问题讨论】:

半相关,你可能想做: boost::bind(&IProtocol::handle_data_recv, protocol.shared_from_this(), boost::asio::placeholders::error) 或类似的强制寿命,尽管取决于程序的结构,您可能已经隐含地强制使用寿命。这当然会迫使您使用 shared_ptr 并从 enable_shared_from_this 派生 很好的观察,谢谢 - 但正如您提到的,协议对象在程序的生命周期内也存在。所有这些都是在调用 io_service.run() 之前在 main() 中创建的 【参考方案1】:

不,幸运的是,绑定并不关心 recv_data 的内部可能会被重新分配,而是绑定到 recv_data 对象本身。这是我编写的下载器的工作示例,您可以看到我没有在两次读取之间重新分配缓冲区。

以同样的方式,您可以安全地共享对向量的引用,而不必关心向量的内部是否被重新分配(除非您开始直接指向向量元素的内存地址,或者在它们失效后使用迭代器. 向量的句柄仍然有效,以同样的方式,streambuf 的句柄对 istream 仍然有效,并且工作正常)。

下载.h

#ifndef _MV_DOWNLOAD_H_
#define _MV_DOWNLOAD_H_

#include <string>
#include <iostream>
#include <istream>
#include <ostream>
#include <fstream>
#include <algorithm>
#include "Network/url.h"
#include "Utility/generalUtility.h"
#include <boost/asio.hpp>
#include <boost/bind.hpp>

namespace MV 
    struct HttpHeader 
        std::string version;
        int status = 0;
        std::string message;
        std::map<std::string, std::string> values;

        std::vector<std::string> bounces;

        bool success = false;
        std::string errorMessage;

        size_t contentLength;

        HttpHeader() 
        

        HttpHeader(std::istream& response_stream) 
            read(response_stream);
        

        void read(std::istream& response_stream);
    ;


    inline std::ostream& operator<<(std::ostream& os, const HttpHeader& obj) 
        os << "\\/______HTTP_HEADER______\\/\nVersion [" << obj.version << "] Status [" << obj.status << "] Message [" << obj.message << "]\n";
        os << "||-----------------------||\n";
        for (auto&& kvp : obj.values) 
            os << "[" << kvp.first << "]: " << kvp.second << "\n";
        
        os << "\n||--------Bounces--------||\n";
        for (size_t i = 0; i < obj.bounces.size(); ++i) 
            os << i << ": " << obj.bounces[i] << "\n";
        
        os << "/\\_______________________/\\" << std::endl;
        return os;
    
    inline std::istream& operator>>(std::istream& a_is, HttpHeader& a_obj) 
        a_obj.read(a_is);
        return a_is;
    

    class DownloadRequest : public std::enable_shared_from_this<DownloadRequest> 
    public:
        static std::shared_ptr<DownloadRequest> make(const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput) 
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->perform(a_url);
            return result;
        

        //onComplete is called on success or error at the end of the download.
        static std::shared_ptr<DownloadRequest> make(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput, std::function<void (std::shared_ptr<DownloadRequest>)> a_onComplete) 
            auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput));
            result->onComplete = a_onComplete;
            result->ioService = a_ioService;
            result->perform(a_url);
            return result;
        

        HttpHeader& header() 
            return headerData;
        

        MV::Url finalUrl() 
            return currentUrl;
        

        MV::Url inputUrl() 
            return originalUrl;
        

    private:
        DownloadRequest(const std::shared_ptr<std::ostream> &a_streamOutput) :
            streamOutput(a_streamOutput) 
        

        void perform(const MV::Url& a_url);

        bool initializeSocket();

        void initiateRequest(const MV::Url& a_url);

        void handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
        void handleWriteRequest(const boost::system::error_code& err);
        void handleReadHeaders(const boost::system::error_code& err);
        void handleReadContent(const boost::system::error_code& err);

        void readResponseToStream() 
            (*streamOutput) << &(*response);
        

        std::shared_ptr<boost::asio::io_service> ioService;
        std::unique_ptr<boost::asio::ip::tcp::resolver> resolver;
        std::unique_ptr<boost::asio::ip::tcp::socket> socket;

        std::unique_ptr<std::istream> responseStream;

        std::unique_ptr<boost::asio::streambuf> request;
        std::unique_ptr<boost::asio::streambuf> response;

        std::shared_ptr<std::ostream> streamOutput;

        HttpHeader headerData;

        MV::Url currentUrl;
        MV::Url originalUrl;

        std::function<void(std::shared_ptr<DownloadRequest>)> onComplete;
    ;

    std::string DownloadString(const MV::Url& a_url);

    HttpHeader DownloadFile(const MV::Url& a_url, const std::string &a_path);
    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());

    void DownloadFiles(const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>());
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>(), std::function<void()> a_onAllComplete = std::function<void()>());

#endif

下载.cpp

#include "download.h"
#include <boost/filesystem.hpp>
#include <atomic>

namespace MV
    void HttpHeader::read(std::istream& response_stream) 
        values.clear();

        response_stream >> version;
        std::string status_code;
        response_stream >> status_code;
        try 
            status = std::stoi(status_code);
         catch (...) 
            status = 0;
        

        getline_platform_agnostic(response_stream, message);

        if (!message.empty() && message[0] == ' ')  message = message.substr(1); 

        std::string header;
        while (getline_platform_agnostic(response_stream, header) && !header.empty()) 
            auto index = header.find_first_of(':');
            if (index != std::string::npos && index > 0) 
                auto key = header.substr(0, index);
                auto value = (index + 2 >= header.size()) ? "" : header.substr(index + 2);
                std::transform(key.begin(), key.end(), key.begin(), [](char c) return std::tolower(c); );
                values[key] = value;
                if (toLower(key) == "content-length") 
                    try 
                        contentLength = static_cast<size_t>(stol(value));
                     catch (std::exception &e) 
                        std::cerr << e.what() << std::endl;
                        contentLength = 0;
                    
                
            
        
    

    std::string DownloadString(const Url& a_url) 
        auto result = std::make_shared<std::stringstream>();
        if (DownloadRequest::make(a_url, result)->header().success) 
            return result->str();
         else 
            return "";
        
    

    MV::HttpHeader DownloadFile(const Url& a_url, const std::string &a_path) 
        HttpHeader header;
        
            boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
            auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
            auto request = DownloadRequest::make(a_url, outFile);
            header = request->header();
        
        if (!header.success) 
            std::remove(a_path.c_str());
        
        return header;
    

    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) 
        boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path());
        auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary);
        auto request = DownloadRequest::make(a_ioService, a_url, outFile, [a_path, a_onComplete](std::shared_ptr<DownloadRequest> a_result) 
            if (!a_result->header().success) 
                std::remove(a_path.c_str());
            
            if (a_onComplete)  a_onComplete(a_result); 
        );
    

    void DownloadFiles(const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) 
        auto service = std::make_shared<boost::asio::io_service>();
        for (auto&& url : a_urls) 
            DownloadFile(service, url, a_path + boost::filesystem::path(url.path()).filename().string(), a_onComplete);
        
        service->run();
    
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete, std::function<void()> a_onAllComplete) 
        size_t totalFiles = a_urls.size();
        for (auto&& url : a_urls) 
            auto counter = std::make_shared<std::atomic<size_t>>(0);
            DownloadFile(a_ioService, url, a_path + boost::filesystem::path(url.path()).filename().string(), [=](std::shared_ptr<DownloadRequest> a_request) 
                a_onComplete(a_request);
                if (++(*counter) == totalFiles) 
                    a_onAllComplete();
                
            );
        
    

    void DownloadRequest::handleReadContent(const boost::system::error_code& err) 
        if (!err) 
            readResponseToStream();
            if (onComplete)  onComplete(shared_from_this()); 
         else if (err != boost::asio::error::eof) 
            headerData.success = false;
            headerData.errorMessage = "Download Read Content Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete)  onComplete(shared_from_this()); 
        
    

    void DownloadRequest::handleReadHeaders(const boost::system::error_code& err) 
        if (!err) 
            responseStream = std::make_unique<std::istream>(&(*response));

            headerData.read(*responseStream);
            headerData.success = true;
            headerData.errorMessage = "";
            if (headerData.status >= 300 && headerData.status < 400 && headerData.bounces.size() < 32 && headerData.values.find("location") != headerData.values.end()) 
                headerData.bounces.push_back(currentUrl.toString());
                initiateRequest(headerData.values["location"]);
             else 
                auto amountLeftToRead = headerData.contentLength - response->size();
                if (response->size() > 0) 
                    readResponseToStream();
                
                if (amountLeftToRead > 0) 
                    boost::asio::async_read(*socket, *response, boost::asio::transfer_at_least(amountLeftToRead), boost::bind(&DownloadRequest::handleReadContent, shared_from_this(), boost::asio::placeholders::error));
                 else 
                    if (onComplete)  onComplete(shared_from_this()); 
                
            
         else 
            headerData.success = false;
            headerData.errorMessage = "Download Read Header Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete)  onComplete(shared_from_this()); 
        
    

    void DownloadRequest::handleWriteRequest(const boost::system::error_code& err) 
        if (!err) 
            boost::asio::async_read_until(*socket, *response, "\r\n\r\n", boost::bind(&DownloadRequest::handleReadHeaders, shared_from_this(), boost::asio::placeholders::error));
         else 
            headerData.success = false;
            headerData.errorMessage = "Download Write Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete)  onComplete(shared_from_this()); 
        
    

    void DownloadRequest::handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) 
        if (!err) 
            // The connection was successful. Send the request.
            boost::asio::async_write(*socket, *request, boost::bind(&DownloadRequest::handleWriteRequest, shared_from_this(), boost::asio::placeholders::error));
         else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) 
            // The connection failed. Try the next endpoint in the list.
            socket->close();
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
         else 
            headerData.success = false;
            headerData.errorMessage = "Download Connection Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete)  onComplete(shared_from_this()); 
        
    

    void DownloadRequest::handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) 
        if (!err) 
            // Attempt a connection to the first endpoint in the list. Each endpoint
            // will be tried until we successfully establish a connection.
            boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
            socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator));
         else 
            headerData.success = false;
            headerData.errorMessage = "Download Resolve Failure: " + err.message();
            std::cerr << headerData.errorMessage << std::endl;
            if (onComplete)  onComplete(shared_from_this()); 
        
    

    void DownloadRequest::initiateRequest(const MV::Url& a_url) 
        socket->close();
        currentUrl = a_url;
        request = std::make_unique<boost::asio::streambuf>();
        response = std::make_unique<boost::asio::streambuf>();
        using boost::asio::ip::tcp;

        std::ostream requestStream(&(*request));
        requestStream << "GET " << a_url.pathAndQuery() << " HTTP/1.1\r\n";
        requestStream << "Host: " << a_url.host() << "\r\n";
        requestStream << "Accept: */*\r\n";
        requestStream << "Connection: close\r\n\r\n";

        tcp::resolver::query query(a_url.host(), "http");
        resolver->async_resolve(query, boost::bind(&DownloadRequest::handleResolve, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator));
    

    bool DownloadRequest::initializeSocket() 
        bool created = false;
        if (!ioService) 
            ioService = std::make_shared<boost::asio::io_service>();
            created = true;
        

        resolver = std::make_unique<boost::asio::ip::tcp::resolver>(*ioService);
        socket = std::make_unique<boost::asio::ip::tcp::socket>(*ioService);

        return created;
    

    void DownloadRequest::perform(const MV::Url& a_url) 
        originalUrl = a_url;
        try 
            bool needToCallRun = initializeSocket();
            initiateRequest(a_url);
            if (needToCallRun) 
                ioService->run();
            
         catch (...) 
            headerData.success = false;
            headerData.errorMessage = "Exception thrown to top level.";
            std::cerr << headerData.errorMessage << std::endl;
            onComplete(shared_from_this());
        
    


generalUtility.h(反正是其中的一部分,仅供参考)

inline std::istream& getline_platform_agnostic(std::istream& is, std::string& t) 
    t.clear();

    // The characters in the stream are read one-by-one using a std::streambuf.
    // That is faster than reading them one-by-one using the std::istream.
    // Code that uses streambuf this way must be guarded by a sentry object.
    // The sentry object performs various tasks,
    // such as thread synchronization and updating the stream state.

    std::istream::sentry se(is, true);
    std::streambuf* sb = is.rdbuf();

    for (;;) 
        int c = sb->sbumpc();
        switch (c) 
        case '\n':
            return is;
        case '\r':
            if (sb->sgetc() == '\n')
                sb->sbumpc();
            return is;
        case EOF:
            // Also handle the case when the last line has no line ending
            if (t.empty())
                is.setstate(std::ios::eofbit);
            return is;
        default:
            t += (char)c;
        
    


inline std::string toLower(std::string s) 
    std::transform(s.begin(), s.end(), s.begin(), [](char c)  return std::tolower(c); );
    return s;

url.h

修改(稍微,只是改变了一些命名方案的东西)

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/include/Poco/URI.h

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/src/URI.cpp

【讨论】:

以上是关于在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream的主要内容,如果未能解决你的问题,请参考以下文章

如何使具有动态内容的reactstrap模态可调整大小和可拖动?

具有可拖放可拖动和可调整大小的 Jquery 无法按预期工作

如何使图像可调整大小的裁剪框

为啥 boost::asio::read 缓冲区数据大小小于读取大小?

创建没有标题栏的窗口,具有可调整大小的边框并且没有虚假的 6px 白色条纹

如何从可调整大小的窗口中删除最小化和最大化按钮?