提升异步 tcp 客户端
Posted
技术标签:
【中文标题】提升异步 tcp 客户端【英文标题】:boost async tcp client 【发布时间】:2012-10-20 17:19:56 【问题描述】:我刚刚开始使用 boost。 我正在用异步套接字编写 TCP 客户端-服务器。
任务如下:
-
客户端向服务器发送一个号码
客户端可以在收到服务器的答复之前发送另一个号码。
服务器接收到一个数字,用它做一些计算并将结果发送回客户端。
可以将多个客户端连接到服务器。
现在工作如下
从客户端发送一个号码到服务器 服务器在当前线程中接收一个数字并在 OnReceive 处理程序中进行计算(我知道这很糟糕……但我应该如何启动一个新线程来并行计算) 服务器发回应答,但客户端已断开连接如何让客户端从键盘输入数字的同时等待服务器的回答?
为什么我的客户不等待服务器的答复?
客户端代码:
using boost::asio::ip::tcp;
class TCPClient
public:
TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter);
void Close();
private:
boost::asio::io_service& m_ioservice;
tcp::socket m_Socket;
string m_SendBuffer;
static const size_t m_BufLen = 100;
char m_RecieveBuffer[m_BufLen*2];
void OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter);
void OnReceive(const boost::system::error_code& ErrorCode);
void OnSend(const boost::system::error_code& ErrorCode);
void DoClose();
;
TCPClient::TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter)
: m_IOService(IO_Service), m_Socket(IO_Service), m_SendBuffer("")
tcp::endpoint EndPoint = *EndPointIter;
m_Socket.async_connect(EndPoint,
boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
void TCPClient::Close()
m_IOService.post(
boost::bind(&TCPClient::DoClose, this));
void TCPClient::OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter)
cout << "OnConnect..." << endl;
if (ErrorCode == 0)
cin >> m_SendBuffer;
cout << "Entered: " << m_SendBuffer << endl;
m_SendBuffer += "\0";
m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),
boost::bind(&TCPClient::OnSend, this,
boost::asio::placeholders::error));
else if (EndPointIter != tcp::resolver::iterator())
m_Socket.close();
tcp::endpoint EndPoint = *EndPointIter;
m_Socket.async_connect(EndPoint,
boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
void TCPClient::OnReceive(const boost::system::error_code& ErrorCode)
cout << "receiving..." << endl;
if (ErrorCode == 0)
cout << m_RecieveBuffer << endl;
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
else
cout << "ERROR! OnReceive..." << endl;
DoClose();
void TCPClient::OnSend(const boost::system::error_code& ErrorCode)
cout << "sending..." << endl;
if (!ErrorCode)
cout << "\""<< m_SendBuffer <<"\" has been sent" << endl;
m_SendBuffer = "";
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
else
cout << "OnSend closing" << endl;
DoClose();
void TCPClient::DoClose()
m_Socket.close();
int main()
try
cout << "Client is starting..." << endl;
boost::asio::io_service IO_Service;
tcp::resolver Resolver(IO_Service);
string port = "13";
tcp::resolver::query Query("127.0.0.1", port);
tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query);
TCPClient Client(IO_Service, EndPointIterator);
cout << "Client is started!" << endl;
cout << "Enter a query string " << endl;
boost::thread ClientThread(boost::bind(&boost::asio::io_service::run, &IO_Service));
Client.Close();
ClientThread.join();
catch (exception& e)
cerr << e.what() << endl;
cout << "\nClosing";
getch();
这是控制台的输出
Client is starting...
Client is started!
OnConnect...
12
Entered: 12
sending...
"12" has been sent
receiving...
ERROR! OnReceive...
Closing
服务器部分
class Session
public:
Session(boost::asio::io_service& io_service)
: socket_(io_service)
dataRx[0] = '\0';
dataTx[0] = '\0';
tcp::socket& socket()
return socket_;
void start()
socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
boost::bind(&Session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
cout << "reading..." << endl;
cout << "Data: " << dataRx << endl;
if (!error)
if (!isValidData())
cout << "Bad data!" << endl;
sprintf(dataTx, "Bad data!\0");
dataRx[0] = '\0';
else
sprintf(dataTx, getFactorization().c_str());
dataRx[0] = '\0';
boost::asio::async_write(socket_,
boost::asio::buffer(dataTx, max_length*2),
boost::bind(&Session::handle_write, this,
boost::asio::placeholders::error));
else
delete this;
void handle_write(const boost::system::error_code& error)
cout << "writing..." << endl;
if (!error)
cout << "dataTx sent: " << dataTx << endl;
dataTx[0] = '\0';
socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
boost::bind(&Session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
else
delete this;
string getFactorization() const
//Do something
bool isValidData()
locale loc;
for (int i = 0; i < strlen(dataRx); i++)
if (!isdigit(dataRx[i],loc))
return false;
return true;
private:
tcp::socket socket_;
static const size_t max_length = 100;
char dataRx[max_length];
char dataTx[max_length*2];
;
class Server
public:
Server(boost::asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
Session* new_session = new Session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&Server::handle_accept, this, new_session,
boost::asio::placeholders::error));
void handle_accept(Session* new_session, const boost::system::error_code& error)
if (!error)
new_session->start();
new_session = new Session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&Server::handle_accept, this, new_session,
boost::asio::placeholders::error));
else
delete new_session;
private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
;
int main(int argc, char* argv[])
cout << "Server is runing..." << endl;
try
boost::asio::io_service io_service;
int port = 13;
Server s(io_service, port);
cout << "Server is run!" << endl;
io_service.run();
catch (boost::system::error_code& e)
std::cerr << e << "\n";
catch (std::exception& e)
std::cerr << "Exception: " << e.what() << "\n";
return 0;
服务器的输出
Server is runing...
Server is run!
reading...
Data: 12
writing...
dataTx sent: 13 //just send back received ++number
reading...
Data:
非常感谢您的帮助
========
添加
好的,我明白了。但是检查 ErrorCode == boost::asio::error::eof 不起作用...我做错了什么?
else if (ErrorCode == boost::asio::error::eof)
cout << "boost::asio::error::eof in OnReceive!" << endl;
else
cout << "ERROR! OnReceive..." << ErrorCode << endl;
DoClose();
打印出来的是ERROR! OnReceive...system:10009
好像是我的比较不正确
========
添加
我找到了根本原因。我已经声明使用async_receive
(而不是async_read_some
)并将main
中的行交换为
ClientThread.join();
Client.Close();
现在可以正常使用了!
现在我正在尝试同时从/向套接字读取和写入数据(因为客户端应该能够在收到来自服务器的应答之前发送额外的请求。
在 OnConnect
函数中,我创建了 boost 线程:
boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop, this));
boost::thread receivingThread(boost::bind(&TCPClient::startReceiving, this));
boost::thread sendingThread(boost::bind(&TCPClient::startSending, this));
有实现
void TCPClient::startReceiving()
cout << "receiving..." << endl;
m_RecieveBuffer[0] = '\0';
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error)); //runtime error here
cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode)
cout << "receiving..." << endl;
if (ErrorCode == 0)
cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error));
else
cout << "ERROR! receivingLoop..." << ErrorCode << endl;
DoClose();
void TCPClient::addMsgLoop()
while (true)
string tmp;
cin >> tmp;
cout << "Entered: " << tmp << endl;
tmp += "\0";
try
msgQueue.push(tmp);
catch(exception &e)
cerr << "Canno add msg to send queue... " << e.what() << endl;
receive
和 send
线程的问题相同:运行时错误(在 boost 库中的某处写入访问冲突)。
void TCPClient::startReceiving()
...
m_Socket.async_receive(); //runtime error here
在后续版本中一切正常(但我不知道如何在应答前实现多次发送)。 谁能告诉我如何解决这个问题或如何通过另一种方式实现这个问题?池化可能会有所帮助,但我现在确信这是个好方法。
【问题讨论】:
在OnReceive
你必须检查你得到了什么错误,它可能会告诉你问题是什么。
当然还要检查服务器端,看看那里发生了什么。
请考虑一下:m_SendBuffer += "\0";
(提示:"\0"
和 ""
有什么区别?)另外,如果你要使用 read_some
,那是你的确保您收到完整的应用程序级消息。
【参考方案1】:
boost::asio::ip::tcp::socket::async_read_some 顾名思义,不能保证读取完整的数据。当客户端完成写入时,它将error
对象设置为boost::asio::error::eof
。
你得到的错误是因为这个:
服务器部分
if (!error)
...
else
delete this;
在else
块中,您假设这是一个错误情况并关闭连接。这并非总是如此。在else
之前,您需要检查error == boost::asio::error::eof
。
除了这个在读取处理程序中,你应该继续收集缓冲区中读取的任何内容,直到你点击error == boost::asio::error::eof
。只有这样你才应该验证读取数据并写回客户端。
看看examples部分中的HTTP服务器1、2、3实现。
更新:回答更新后的问题
更新后的代码存在线程同步问题。
msgQueue
被两个或多个线程同时访问,没有任何锁。
可以同时调用同一个套接字上的读取和写入。
如果我正确理解了您的问题,您希望:
-
获取用户输入并将其发送到服务器。
保持同时接收服务器的响应。
您可以为这两个任务使用两个boost::asio::io_service::strands。使用 Asio 时,链是同步任务的方式。 Asio 确保在一个链中发布的任务是同步执行的。
在strand1
中发布send
任务,如下所示:read_user_input -> send_to_server -> handle_send -> read_user_input
在strand2
中发布read
任务,如下所示:read_some -> handle_read -> read_some
这将确保不会从两个线程同时访问msgQueue
。使用两个套接字对服务器进行读写,以确保不会在同一个套接字上调用同时读写。
【讨论】:
error == boost::asio::error::eof
不起作用(请参阅下面的问题)
@Torrius,您是否将它添加到服务器和客户端?从您发布的代码来看,您似乎只添加到客户端的读取处理程序中。另请编辑您的原始问题以添加更多 cmets。不要将这些添加为另一个答案。
@Torrius,更新了我对您编辑的问题的回答。如果与原始问题无关,请创建一个新问题。以上是关于提升异步 tcp 客户端的主要内容,如果未能解决你的问题,请参考以下文章