我不知道为啥使用 boost asio 的代码会导致错误

Posted

技术标签:

【中文标题】我不知道为啥使用 boost asio 的代码会导致错误【英文标题】:I don't know why code using boost asio is causing an error我不知道为什么使用 boost asio 的代码会导致错误 【发布时间】:2020-08-08 12:31:18 【问题描述】:

我创建了主 cpp 文件和三个类来创建异步服务器。 分别为ServerServiceAcceptor。 然而,它们在构建过程中导致了错误,即使在 Visual Studio 2019 环境中没有错误。 我试图修复错误,但是大部分错误发生在其他文件中,所以我自己也想不出来。

main

#include "Server.h"
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <atomic>
#include <memory>
#define DEFAULT_THREAD_SIZE 2;
using namespace boost;
int main()

 unsigned short port_num;
    std::cin >> port_num;
    try 
        Server srv;
        unsigned int threads = std::thread::hardware_concurrency() * 2;
        if (threads == 0) 
            threads = DEFAULT_THREAD_SIZE;
        
        std::cout << "\nPort - " << port_num << "\nServer start\n";
        srv.Start(port_num, threads);
        while (1) 
            std::cin >> srv;
        
    
    catch (system::system_error& e) 
        std::cout << "\nError code: " << e.code() << "\nError Message\n" << e.what();
    
    return 0;

这包括定义 Server 类的 Server.h。

#include "Acceptor.h"
#include <boost/asio.hpp>
#include <thread>
#include <atomic>
#include <memory>
using namespace boost;
class Server

public:
    Server();
    void Start(unsigned short port_num, unsigned int threads);
    void Stop();
    int Command(std::string& str);
private:
    asio::io_service mios;
    std::unique_ptr<asio::io_service::work> mWork;
    std::unique_ptr<Acceptor> mAcceptor;
    std::vector <std::unique_ptr<std::thread>> mThreads;
;

std::istream& operator>>(std::istream& is, Server& srv);

这是实现,Server.cpp。

#include "Server.h"
Server::Server() 
    mWork.reset(new asio::io_service::work(mios));

void Server::Start(unsigned short port_num, unsigned int threads) 
    assert(thread > 0);
    mAcceptor.reset(new Acceptor(mios, port_num));
    mAcceptor->Start();
    for (int i = 0; i < threads; i++) 
        std::unique_ptr<std::thread> th(new std::thread([this]() mios.run(); ));
        mThreads.push_back(std::move(th));
    

void Server::Stop() 
    mAcceptor->Stop();
    mios.stop();

    for (auto& th : mThreads) 
        th->join();
    

int Server::Command(std::string& str) 
    return 0;

std::istream& operator>>(std::istream& is, Server& srv) 
    std::string str;
    is >> str;
    srv.Command(str);
    return is;

这是 Acceptor 类。

#include "Service.h"
#include <boost/asio.hpp>
#include <thread>
#include <atomic>
#include <memory>
using namespace boost;
class Acceptor

public:
    Acceptor(asio::io_service& ios, unsigned short port_num);
    void Start();
    void Stop();
private:
    std::shared_ptr<asio::io_service> mios;
    std::shared_ptr<asio::ip::tcp::acceptor> mAcceptor;
    std::atomic<bool> mIsStopped;
    void InitAccept();
    void OnAccept(const system::error_code ec, std::shared_ptr<asio::ip::tcp::socket> sock);
;
#include "Acceptor.h"
Acceptor::Acceptor(asio::io_service& ios, unsigned short port_num) 
    mios = std::make_shared<asio::io_service>(ios);
    mAcceptor = std::make_shared<asio::ip::tcp::acceptor>(mios, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), port_num));
    mIsStopped = false;

void Acceptor::Start() 
    mAcceptor->listen();
    InitAccept();

void Acceptor::Stop() 
    mIsStopped.store(true);


void Acceptor::InitAccept() 
    std::shared_ptr<asio::ip::tcp::socket> sock(new asio::ip::tcp::socket(mios));
    mAcceptor->async_accept(*sock, [this, sock](const system::error_code& error) OnAccept(error, sock););

void Acceptor::OnAccept(const system::error_code ec, std::shared_ptr<asio::ip::tcp::socket> sock) 
    if (ec.value() == 0 || ER) 
        (new Service(sock))->StartHandling();
    
    else
        std::cout << "Error code:" << ec.value() << "error " << "Error message: " << ec.message() << "\n";
    
    if (!mIsStopped.load()) 
        InitAccept();
    
    else 
        mAcceptor->close();
    

服务类

#define ER true
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <atomic>
#include <memory>
using namespace boost;
class Service

public:
    Service(std::shared_ptr<asio::ip::tcp::socket> sock);
    void StartHandling();
private:
    void OnRequestReceived(const boost::system::error_code& ec, std::size_t bytes_transferred);
    std::string mReponse;
    std::shared_ptr<asio::ip::tcp::socket> mSock;
    asio::streambuf mRequest;
    void OnReponseSent(const system::error_code& ec, std::size_t bytes_transferred);

    void OnFinish();
    std::string ProcessRequest(asio::streambuf& request);
;
#include "Service.h"
Service::Service(std::shared_ptr<asio::ip::tcp::socket> sock)
    mSock = sock;

void Service::StartHandling() 
    asio::async_read_until(mSock, mRequest, '\n', [this](const system::error_code ec, std::size_t bytes_transferred) OnRequestReceived(ec, bytes_transferred); );


void Service::OnRequestReceived(const system::error_code& ec, std::size_t bytes_transferred) 
    if (ec.value() != 0 || ER) 
        std::cout << "Error code:" << ec.value() << "Error message: " << ec.message() << "\n";
        OnFinish();
        return;
    
    mReponse = ProcessRequest(mRequest);
    asio::async_write(mSock, asio::buffer(mReponse), [this](const system::error_code& ec, std::size_t bytes_transferred) OnReponseSent(ec, bytes_transferred); );


void Service::OnReponseSent(const system::error_code& ec, std::size_t bytes_transferred) 
    if (ec.value() != 0 || ER) 
        std::cout << "Error code:" << ec.value() << "Error message: " << ec.message() << "\n";
    
    OnFinish();


void Service::OnFinish() 
    delete this;

std::string Service::ProcessRequest(asio::streambuf& request) 
    std::string reponse;
    std::istream input(&request);
    std::getline(input, reponse);
    assert(reponse.back() == '\n');
    return reponse;

我不知道该怎么做。我想自己做,但我什至无法调试,因为我不知道问题出在哪里,而且没有构建。

【问题讨论】:

有什么错误? 【参考方案1】:

它根本无法编译。我真的很想知道人们怎么能想出 /so much/ 代码,然后才注意到这些东西无法编译。

规则 #1:婴儿步骤(这同样适用于专业人士,只是他们将其内化)。

你正在做这样的事情:

mios = std::make_shared<asio::io_service>(ios);

这要求io_service 是可复制的(它不是)。您可能会将mios 设为参考:

asio::io_service& mios;

似乎到处都在“迷信”地使用 shared_ptr。

事实

assert(thread > 0);

拼写错误的threads 表示您可能一直在构建仅发布版本。

阅读编译器消息:

void Service::StartHandling() 
    asio::async_read_until(mSock, mRequest, '\n', [this](const system::error_code ec, std::size_t bytes_transferred) OnRequestReceived(ec, bytes_transferred); );

这会触发错误:

/home/sehe/custom/boost_1_73_0/boost/asio/impl/read_until.hpp|959 col 53| error: no type named ‘executor_type’ in ‘class std::shared_ptr<boost::asio::basic_stream_socket<boost::asio::ip::tcp> >’

显然你的意思是*mSock。以后一样:

asio::async_write(*mSock, asio::buffer(mReponse), [this](const system::error_code& ec, std::size_t bytes_transferred) OnReponseSent(ec, bytes_transferred); );

指针不是它指向的对象——甚至不是智能指针。智能指针的要点 [原文如此] 不是让 C++ 等于(比如说)Java - 如果你愿意,你应该使用 Java。

用这些编译:Live ON Wandbox

更多评论

*** const 对值参数没有影响

不要使用new or delete

 mWork.reset(new asio::io_service::work(mios));

改用make_unique

 mWork = std::make_unique<asio::io_service::work>(mios);
 // ...
 mAcceptor = std::make_unique<Acceptor>(mios, port_num);

使用标题保护(或#pragma once

不要使用命名空间 using 指令;改用 using 声明

特别是不要在头文件中使用命名空间 using-directives(你自己做 您的用户无法防止/修复名称冲突,这可能会导致编译错误无声的行为改变)

使用构造函数初始化列表(和移动语义):

 Service::Service(std::shared_ptr<asio::ip::tcp::socket> sock)
     mSock = sock;
 

变成

 Service::Service(std::shared_ptr<asio::ip::tcp::socket> sock) 
     : mSock(std::move(sock))
  

这里:

 (new Service(std::move(sock)))->StartHandling();

不要使用 new,不要迷信——使用共享指针,讽刺的是,在 Service 的情况考虑使用enable_shared_from_this 所以你 使用shared_ptr 而不是delete this; 反模式。

初始化您的原始类成员1

 std::atomic<bool> mIsStopped;

没有,它的值是不确定的,使用时通常会导致UB

不要忽略错误:

 if (ec.value() == 0 || ER) 
     (new Service(std::move(sock)))->StartHandling();
 

相反,报告/记录。此外,可移植地检测错误:

 if (!ec) 

或者

 if (!ec.failed()) 

一般来说,处理错误(例如cin &gt;&gt; port_num),

通过 const& 捕获

中间结果(仍然编译):Live on Wandbox

奖金

简化,使用asio::thread_pool,统一初始化

使用bytes_transferred! read_until 确实保证它会在 分隔符,因为 TCP 不是这样工作的。可以存在尾随数据 在缓冲区中。这意味着在 DEBUG 构建中,此断言有时会失败:

 assert(request.back() == '\n');

实际上读取response.back()的代码肯定会失败,因为getline不包含它¯\(ツ)

您可以使用boost::iostreams::restrict 或代替 asio::dynamic_buffer()std::string 上并将 string_view 传递给 处理程序(ProcessRequest):

 mReponse = ProcessRequest(std::string_view(mRequest).substr(0, bytes_transferred));

后来

 #include <boost/iostreams/device/array.hpp>
 #include <boost/iostreams/stream_buffer.hpp>

 std::string Service::ProcessRequest(std::string_view request) 
     assert(request.back() == '\n');
     boost::iostreams::stream_buffer<boost::iostreams::array_source> buf(
         request.data(), request.size());

     std::istream input(&buf);
     std::string reponse;
     std::getline(input, reponse);
     return reponse;
 

摆脱所有多余的共享指针。如果Acceptor 已经 由共享指针管理的动态分配,真的没有必要 通过 shared_ptr 使其拥有 tcp::acceptor 实例。一般来说 所有成员都可能只是代码中的值。只要 周围的物体留在周围(就像你对服务所做的那样)成员是 保证还活着。

mIsStopped 可以通过简单地cancel()-ing 接受器来消除。要获得线程安全,只需发布​​到相关的执行者。

如果您希望服务器在执行停止命令时真正退出,则需要使while(true) 循环具有停止条件,例如

 int Server::Command(std::string const& cmd) 
     std::cout << "Command: " << std::quoted(cmd) << "\n";
     if (cmd == "quit") 
         Stop();
         return 1;
     
     std::cerr << "Unknown command (\"quit\" to exit)" << std::endl;
     return 0;
 

 std::istream& operator>>(std::istream& is, Server& srv) 
     std::string str;
     is >> str;
     if (srv.Command(str)) 
         is.setstate(std::ios::badbit);
     
     return is;
 

main:

     while (std::cin >> srv)  

完整演示:

Live On Wandbox

文件Acceptor.h

 #ifndef _HOME_SEHE_PROJECTS_***_ASIO_ACCEPTOR_H
 #define _HOME_SEHE_PROJECTS_***_ASIO_ACCEPTOR_H

 #include "Service.h"
 class Acceptor 
   public:
     template <typename Executor>
     Acceptor(Executor ex, unsigned short port_num) : mAcceptor(make_strand(ex), , port_num) 
     void Start();
     void Stop();

   private:
     tcp::acceptor mAcceptor;
     void InitAccept();
     void OnAccept(error_code ec, tcp::socket&& sock);
 ;

 #endif

文件Common.h

 #pragma once
 #include <boost/asio.hpp>
 #include <memory>
 #include <thread>
 #include <atomic>

 namespace asio = boost::asio;
 using boost::system::error_code;
 using asio::ip::tcp;

文件Server.h

 #ifndef _HOME_SEHE_PROJECTS_***_ASIO_SERVER_H
 #define _HOME_SEHE_PROJECTS_***_ASIO_SERVER_H

 #include "Acceptor.h"
 class Server 
   public:
     explicit Server(unsigned short port_num);
     void Start();
     void Stop();
     int Command(std::string const& str);

   private:
     asio::thread_pool mio;
     Acceptor mAcceptor;
 ;

 std::istream& operator>>(std::istream& is, Server& srv);

 #endif

文件Service.h

 #ifndef _HOME_SEHE_PROJECTS_***_ASIO_SERVICE_H
 #define _HOME_SEHE_PROJECTS_***_ASIO_SERVICE_H

 #include "Common.h"
 #include <iostream>
 class Service : public std::enable_shared_from_this<Service> 
   public:
     explicit Service(tcp::socket&& sock);
     void StartHandling();

   private:
     void OnRequestReceived(error_code ec, std::size_t bytes_transferred);
     std::string mRequest, mReponse;
     tcp::socket mSock;
     void OnReponseSent(error_code ec, size_t bytes_transferred);

     std::string ProcessRequest(std::string_view request);
 ;

 #endif

文件Acceptor.cpp

 #include "Acceptor.h"
 #include <utility>

 void Acceptor::Start() 
     mAcceptor.listen();
     InitAccept();
 

 void Acceptor::Stop() 
     // be thread safe
     post(mAcceptor.get_executor(), [this]  mAcceptor.cancel(); );
 

 void Acceptor::InitAccept() 
     mAcceptor.async_accept(
         make_strand(mAcceptor.get_executor()),
         [this](error_code error, tcp::socket&& sock)  OnAccept(error, std::move(sock)); );
 

 void Acceptor::OnAccept(error_code ec, tcp::socket&& sock) 
     if (!ec.failed()) 
         std::make_shared<Service>(std::move(sock))->StartHandling();
         InitAccept();
      else 
         std::cout << "OnAccept: " << ec.message() << "\n";
     
 

文件main.cpp

 #include "Server.h"
 #include <iostream>

 int main() 
     if (uint16_t port_num; std::cin >> port_num) 
         try 
             Server srv(port_num);
             std::cout << "Port - " << port_num << "\nServer start\n";
             srv.Start();

             while (std::cin >> srv)  
          catch (boost::system::system_error const& e) 
             std::cout << "Error " << e.code().message() << "\n";
         
      else 
         std::cerr << "Invalid input (port number required)\n";
     
 

文件Server.cpp

 #include "Server.h"
 #include <iomanip>

 Server::Server(unsigned short port_num)
     : mAcceptor(make_strand(mio), port_num) 

 void Server::Start()  mAcceptor.Start(); 
 void Server::Stop()  mAcceptor.Stop(); 

 int Server::Command(std::string const& cmd) 
     std::cout << "Command: " << std::quoted(cmd) << "\n";
     if (cmd == "quit") 
         Stop();
         return 1;
     
     std::cerr << "Unknown command (\"quit\" to exit)" << std::endl;
     return 0;
 

 std::istream& operator>>(std::istream& is, Server& srv) 
     std::string str;
     is >> str;
     if (srv.Command(str)) 
         is.setstate(std::ios::badbit);
     
     return is;
 

文件Service.cpp

 #include "Service.h"
 #include <utility>
 #include <iomanip>

 Service::Service(tcp::socket&& sock)
         : mSock(std::move(sock)) 

 void Service::StartHandling() 
     asio::async_read_until(
         mSock, asio::dynamic_buffer(mRequest), '\n',
         [this, self = shared_from_this()](error_code ec, std::size_t bytes_transferred) 
             OnRequestReceived(ec, bytes_transferred);
         );
 

 void Service::OnRequestReceived(error_code ec, std::size_t bytes_transferred) 
     if (ec) 
         std::cout << "OnRequestReceived: " << ec.message() << "\n";
         return;
     

     std::string_view view = mRequest;
     mReponse = ProcessRequest(view.substr(0, bytes_transferred));

     asio::async_write(
             mSock, asio::buffer(mReponse),
             [this, self = shared_from_this()](error_code ec, std::size_t bytes_transferred) 
                 OnReponseSent(ec, bytes_transferred);
             );
 

 void Service::OnReponseSent(error_code ec, std::size_t /*bytes_transferred*/) 
     if (ec) 
         std::cout << "OnReponseSent: " << ec.message() << "\n";
     
 

 #include <boost/iostreams/device/array.hpp>
 #include <boost/iostreams/stream_buffer.hpp>

 std::string Service::ProcessRequest(std::string_view request) 
     //std::cerr << "TRACE: " << std::quoted(request) << "\n";
     assert(request.back() == '\n');
     boost::iostreams::stream_buffer<boost::iostreams::array_source> buf(
         request.data(), request.size());

     std::istream input(&buf);
     std::string reponse;
     std::getline(input, reponse);
     return reponse + '\n';
 

例如使用2323 和更高版本的quit 命令运行时:

# (echo 2323; sleep 30; echo quit) | ./sotest
Port - 2323
Server start
Command: "quit"
OnAccept: Operation canceled

它确实正确地接受多个连接:

# for a in 1..10; do printf "Message with random data $RANDOM\n" | nc localhost 2323; done
Message with random data 8002
Message with random data 28046
Message with random data 17943
Message with random data 17845
Message with random data 10832
Message with random data 20049
Message with random data 27593
Message with random data 18979
Message with random data 2773
Message with random data 31159

【讨论】:

通过许多简化添加了更深入的评论。测试并确认 UBSAN/ASAN 干净:wandbox.org/permlink/vq3oFBgqB9CpbQjD 我使用了 strands - 你使用 atomic&lt;bool&gt; 的方法对我来说看起来很有效,因为你只在“逻辑链”上使用它,但请参阅 ***.com/questions/12794107/… 为方便起见,gist 中的所有文件:gist.github.com/sehe/e26c33ce9c845966a83850b2dea3f207

以上是关于我不知道为啥使用 boost asio 的代码会导致错误的主要内容,如果未能解决你的问题,请参考以下文章

websocketpp 和 boost.asio 有啥区别?

为啥这个 boost::asio::tcp::socket 可以重用?

为啥在使用 boost::asio 时每个连接都需要 strand?

为啥 boost::asio::read 缓冲区数据大小小于读取大小?

使用 boost::asio::thread_pool 的 C++ 线程池,为啥我不能重用我的线程?

为啥 Boost.Asio SSL 请求返​​回 405 Not Allowed?