提升异步 tcp 客户端

Posted

技术标签:

【中文标题】提升异步 tcp 客户端【英文标题】:boost async tcp client 【发布时间】:2012-10-20 17:19:56 【问题描述】:

我刚刚开始使用 boost。 我正在用异步套接字编写 TCP 客户端-服务器。

任务如下:

    客户端向服务器发送一个号码 客户端可以在收到服务器的答复之前发送另一个号码。 服务器接收到一个数字,用它做一些计算并将结果发送回客户端。 可以将多个客户端连接到服务器。

现在工作如下

从客户端发送一个号码到服务器 服务器在当前线程中接收一个数字并在 OnReceive 处理程序中进行计算(我知道这很糟糕……但我应该如何启动一个新线程来并行计算) 服务器发回应答,但客户端已断开连接

如何让客户端从键盘输入数字的同时等待服务器的回答?

为什么我的客户不等待服务器的答复?

客户端代码:

using boost::asio::ip::tcp;

class TCPClient

    public:
        TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter);
        void Close();

    private:
        boost::asio::io_service& m_ioservice;
        tcp::socket m_Socket;

        string m_SendBuffer;
        static const size_t m_BufLen = 100;
        char m_RecieveBuffer[m_BufLen*2];

        void OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter);
        void OnReceive(const boost::system::error_code& ErrorCode);
        void OnSend(const boost::system::error_code& ErrorCode);
        void DoClose();
;

TCPClient::TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter)
: m_IOService(IO_Service), m_Socket(IO_Service), m_SendBuffer("")

    tcp::endpoint EndPoint = *EndPointIter;

    m_Socket.async_connect(EndPoint,
        boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));


void TCPClient::Close()

    m_IOService.post(
        boost::bind(&TCPClient::DoClose, this));

void TCPClient::OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter)

    cout << "OnConnect..." << endl;
    if (ErrorCode == 0)
    
        cin >> m_SendBuffer;
        cout << "Entered: " << m_SendBuffer << endl;
        m_SendBuffer += "\0";

        m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),
            boost::bind(&TCPClient::OnSend, this,
            boost::asio::placeholders::error));
     
    else if (EndPointIter != tcp::resolver::iterator())
    
        m_Socket.close();
        tcp::endpoint EndPoint = *EndPointIter;

        m_Socket.async_connect(EndPoint, 
            boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
    


void TCPClient::OnReceive(const boost::system::error_code& ErrorCode)

    cout << "receiving..." << endl;
    if (ErrorCode == 0)
    
        cout << m_RecieveBuffer << endl;

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
     
    else 
    
        cout << "ERROR! OnReceive..." << endl;
        DoClose();
    


void TCPClient::OnSend(const boost::system::error_code& ErrorCode)

    cout << "sending..." << endl;
    if (!ErrorCode)
    
        cout << "\""<< m_SendBuffer <<"\" has been sent" << endl;
        m_SendBuffer = "";

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
    
    else
    
        cout << "OnSend closing" << endl;
        DoClose();
    



void TCPClient::DoClose()

    m_Socket.close();


int main()

    try 
    
        cout << "Client is starting..." << endl;
        boost::asio::io_service IO_Service;

        tcp::resolver Resolver(IO_Service);

        string port = "13";
        tcp::resolver::query Query("127.0.0.1", port);

        tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query);

        TCPClient Client(IO_Service, EndPointIterator);

        cout << "Client is started!" << endl;

        cout << "Enter a query string " << endl;

        boost::thread ClientThread(boost::bind(&boost::asio::io_service::run, &IO_Service));

        Client.Close();
        ClientThread.join();
     
    catch (exception& e)
    
        cerr << e.what() << endl;
    

    cout << "\nClosing";
    getch();

这是控制台的输出

Client is starting...
Client is started!
OnConnect...
12
Entered: 12
sending...
"12" has been sent
receiving...
ERROR! OnReceive...

Closing

服务器部分

class Session

    public:
        Session(boost::asio::io_service& io_service)
            : socket_(io_service)
        
            dataRx[0] = '\0';
            dataTx[0] = '\0';
        

        tcp::socket& socket()
        
            return socket_;
        

        void start()
        
            socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                boost::bind(&Session::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
        

        void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
        
            cout << "reading..." << endl;
            cout << "Data: " << dataRx << endl;

            if (!error)
            
                if (!isValidData())
                
                    cout << "Bad data!" << endl;
                    sprintf(dataTx, "Bad data!\0");
                    dataRx[0] = '\0';
                
                else
                
                    sprintf(dataTx, getFactorization().c_str());
                    dataRx[0] = '\0';
                

                boost::asio::async_write(socket_,
                    boost::asio::buffer(dataTx, max_length*2),
                    boost::bind(&Session::handle_write, this,
                    boost::asio::placeholders::error));
            
            else
            
                delete this;
            
        

        void handle_write(const boost::system::error_code& error)
        
            cout << "writing..." << endl;
            if (!error)
            
                cout << "dataTx sent: " << dataTx << endl;
                dataTx[0] = '\0';

                socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                    boost::bind(&Session::handle_read, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
            
            else
            
                delete this;
            
        

        string getFactorization() const
        
            //Do something
        

        bool isValidData()
        
            locale loc; 
            for (int i = 0; i < strlen(dataRx); i++)
                if (!isdigit(dataRx[i],loc))
                    return false;

            return true;
        

    private:
        tcp::socket socket_;
        static const size_t max_length = 100;
        char dataRx[max_length];
        char dataTx[max_length*2];
;

class Server

    public:
        Server(boost::asio::io_service& io_service, short port)
            : io_service_(io_service),
            acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
        
            Session* new_session = new Session(io_service_);
            acceptor_.async_accept(new_session->socket(),
                boost::bind(&Server::handle_accept, this, new_session,
                boost::asio::placeholders::error));
        

        void handle_accept(Session* new_session, const boost::system::error_code& error)
        
            if (!error)
            
                new_session->start();
                new_session = new Session(io_service_);
                acceptor_.async_accept(new_session->socket(),
                    boost::bind(&Server::handle_accept, this, new_session,
                    boost::asio::placeholders::error));
            
            else
            
                delete new_session;
            
        

    private:
        boost::asio::io_service& io_service_;
        tcp::acceptor acceptor_;
;

int main(int argc, char* argv[])

    cout << "Server is runing..." << endl;
    try
    
        boost::asio::io_service io_service;

        int port = 13;
        Server s(io_service, port);
        cout << "Server is run!" << endl;
        io_service.run();
    
    catch (boost::system::error_code& e)
    
        std::cerr << e << "\n";
    
    catch (std::exception& e)
    
        std::cerr << "Exception: " << e.what() << "\n";
    

    return 0;

服务器的输出

Server is runing...
Server is run!
reading...
Data: 12
writing...
dataTx sent: 13    //just send back received ++number
reading...
Data:

非常感谢您的帮助

========

添加

好的,我明白了。但是检查 ErrorCode == boost::asio::error::eof 不起作用...我做错了什么?

else if (ErrorCode == boost::asio::error::eof)

    cout << "boost::asio::error::eof in OnReceive!" << endl;

else 

    cout << "ERROR! OnReceive..." << ErrorCode << endl;
    DoClose();

打印出来的是ERROR! OnReceive...system:10009好像是我的比较不正确

========

添加

我找到了根本原因。我已经声明使用async_receive(而不是async_read_some)并将main中的行交换为

ClientThread.join();
Client.Close();

现在可以正常使用了!

现在我正在尝试同时从/向套接字读取和写入数据(因为客户端应该能够在收到来自服务器的应答之前发送额外的请求。

OnConnect 函数中,我创建了 boost 线程:

boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop, this));
boost::thread receivingThread(boost::bind(&TCPClient::startReceiving, this));
boost::thread sendingThread(boost::bind(&TCPClient::startSending, this));

有实现

void TCPClient::startReceiving()

    cout << "receiving..." << endl;
    m_RecieveBuffer[0] = '\0';
    m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
        boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error)); //runtime error here
    cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;


void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode)

    cout << "receiving..." << endl;
    if (ErrorCode == 0)
    
        cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error));
    
    else 
    
        cout << "ERROR! receivingLoop..." << ErrorCode << endl;
        DoClose();
    


void TCPClient::addMsgLoop()

    while (true)
    
        string tmp;
        cin >> tmp;

        cout << "Entered: " << tmp << endl;
        tmp += "\0";

        try
        
            msgQueue.push(tmp);
        
        catch(exception &e)
        
            cerr << "Canno add msg to send queue... " << e.what() << endl;
        
    

receivesend 线程的问题相同:运行时错误(在 boost 库中的某处写入访问冲突)。

void TCPClient::startReceiving()

     ...
     m_Socket.async_receive(); //runtime error here

在后续版本中一切正常(但我不知道如何在应答前实现多次发送)。 谁能告诉我如何解决这个问题或如何通过另一种方式实现这个问题?池化可能会有所帮助,但我现在确信这是个好方法。

【问题讨论】:

OnReceive 你必须检查你得到了什么错误,它可能会告诉你问题是什么。 当然还要检查服务器端,看看那里发生了什么。 请考虑一下:m_SendBuffer += "\0";(提示:"\0""" 有什么区别?)另外,如果你要使用 read_some,那是你的确保您收到完整的应用程序级消息。 【参考方案1】:

boost::asio::ip::tcp::socket::async_read_some 顾名思义,不能保证读取完整的数据。当客户端完成写入时,它将error 对象设置为boost::asio::error::eof

你得到的错误是因为这个:

服务器部分

        if (!error)
        
            ...
        
        else
        
            delete this;
        

else 块中,您假设这是一个错误情况并关闭连接。这并非总是如此。在else 之前,您需要检查error == boost::asio::error::eof

除了这个在读取处理程序中,你应该继续收集缓冲区中读取的任何内容,直到你点击error == boost::asio::error::eof。只有这样你才应该验证读取数据并写回客户端。

看看examples部分中的HTTP服务器1、2、3实现。

更新:回答更新后的问题

更新后的代码存在线程同步问题。

    msgQueue 被两个或多个线程同时访问,没有任何锁。 可以同时调用同一个套接字上的读取和写入。

如果我正确理解了您的问题,您希望:

    获取用户输入并将其发送到服务器。 保持同时接收服务器的响应。

您可以为这两个任务使用两个boost::asio::io_service::strands。使用 Asio 时,链是同步任务的方式。 Asio 确保在一个链中发布的任务是同步执行的。

    strand1 中发布send 任务,如下所示:read_user_input -&gt; send_to_server -&gt; handle_send -&gt; read_user_input

    strand2 中发布read 任务,如下所示:read_some -&gt; handle_read -&gt; read_some

这将确保不会从两个线程同时访问msgQueue。使用两个套接字对服务器进行读写,以确保不会在同一个套接字上调用同时读写。

【讨论】:

error == boost::asio::error::eof 不起作用(请参阅下面的问题) @Torrius,您是否将它添加到服务器和客户端?从您发布的代码来看,您似乎只添加到客户端的读取处理程序中。另请编辑您的原始问题以添加更多 cmets。不要将这些添加为另一个答案。 @Torrius,更新了我对您编辑的问题的回答。如果与原始问题无关,请创建一个新问题。

以上是关于提升异步 tcp 客户端的主要内容,如果未能解决你的问题,请参考以下文章

TCP 客户端异步套接字回调

8.swoole学习笔记--异步tcp客户端

Boost.Asio 异步 TCP 客户端和多线程

TCP 服务器应用程序中异步访问的集合类

C#里实现简单的异步TCP服务器

C#里实现简单的异步TCP服务器