boost异步tcp通信技术练习

Posted 朱迎春

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了boost异步tcp通信技术练习相关的知识,希望对你有一定的参考价值。

 

本例演示了基本的boost异步tcp服务器程序的编写。

演示程序的构思是这样的:

服务端:

l  控制台程序;

l  侦听客户端连接,当有新连接时,在屏幕上打印消息;

l  当收到客户端数据包时,在屏幕上打印出数据包的大小;

l  当客户端断开时,在屏幕上打印消息;

l  用户在窗口输入exit时,退出程序

l  程序入口:main.cpp

l  类CAsyncTcpServer,头文件CAsyncTcpServer.h,源文件CAsyncTcpServer.cpp

客户端:

l  控制台程序;

l  启动后连接服务器,进入主循环

l  等待用户输入数据包大小,向服务端发送数据包

l  输入0退出程序

 

服务端:main.cpp

#include "stdafx.h"

#include <string>

#include <iostream>

#include "AsyncTcpServer.h"

 

using namespace std;

 

class CEventHandler : public CAsyncTcpServer::IEventHandler

{

public:

    virtual void ClientConnected(int clientId)

    {

        cout << "Client: " << clientId << " connected." << endl;

    }

    virtual void ClientDisconnect(int clientId)

    {

        cout << "Client: " << clientId << " disconnected." << endl;

    }

    virtual void ReceiveData(int clientId, const BYTE* data, size_t length)

    {

        cout << "Client: " << clientId << " receive data size: " << length << endl;

    }

};

 

int _tmain(int argc, _TCHAR* argv[])

{

    string input;

 

    CAsyncTcpServer tcpServer(3, 10000);

    CEventHandler eventHandler;

 

    tcpServer.AddEventHandler(&eventHandler);

   

    while(1){

        cin >> input;

        if(input == "exit"){

            break;

        }

    }

 

    return 0;

}

 

服务端:AsyncTcpServer.h

#pragma once

#include <thread>

#include <array>

#include <boost\bind.hpp>

#include <boost\noncopyable.hpp>

#include <boost\asio.hpp>

#include <boost\asio\placeholders.hpp>

 

using namespace boost::asio;

using namespace boost::asio::ip;

using namespace std;

 

///////////////////////////////////////////////////////////////////////////////////////////////////

// 对应一个Tcp客户端连接

///////////////////////////////////////////////////////////////////////////////////////////////////

class CTcpConnection

{

public:

    CTcpConnection(io_service& ios, int clientId) : m_socket(ios), m_clientId(clientId){}

    ~CTcpConnection(){}

 

    int                     m_clientId;

    tcp::socket             m_socket;

    array<BYTE, 16 * 1024>  m_buffer;

};

typedef shared_ptr<CTcpConnection>  TcpConnectionPtr;

 

///////////////////////////////////////////////////////////////////////////////////////////////////

// 异步TCP服务器

///////////////////////////////////////////////////////////////////////////////////////////////////

class CAsyncTcpServer

    : public boost::noncopyable

{

public:

    // 事件通知接口,类似于信号槽的机制,个人更喜欢这种简单粗暴的方式

    // 想要接受通知的对象只要实现这个接口,再将接口指针通过AddEventHandler添加进来就行了

    class IEventHandler

    {

    public:

        IEventHandler(){}

        virtual ~IEventHandler(){}

        virtual void ClientConnected(int clientId) = 0;

        virtual void ClientDisconnect(int clientId) = 0;

        virtual void ReceiveData(int clientId, const BYTE* data, size_t length) = 0;

    };

public:

    CAsyncTcpServer(int maxClientNumber, int port);

    ~CAsyncTcpServer();

    void AddEventHandler(IEventHandler* pHandler){ m_EventHandlers.push_back(pHandler); }

 

    void Send(int clientId, const BYTE* data, size_t length);

 

private:

    void bind_hand_read(CTcpConnection* client);

    void handle_accept(const boost::system::error_code& error);

    void handle_read(CTcpConnection* client, const boost::system::error_code& error, size_t bytes_transferred);

 

private:

    thread              m_thread;

 

    io_service          m_ioservice;

 

    // io_service中维持一个任务队列

    // 不使用io_service::work时,调用io_service::run时,执行完任务队列中的任务,函数就返回

    // 使用io_service::work时,任务队列为空时,io_service::run会挂起等待新任务

    io_service::work    m_work;

 

    tcp::acceptor       m_acceptor;

 

    int                         m_maxClientNumber;

    int                         m_clientId;

    TcpConnectionPtr            m_nextClient;

    map<int, TcpConnectionPtr>  m_clients;

 

    vector<IEventHandler*>      m_EventHandlers;

};

服务端:AsyncTcpServer.cpp

#include "AsyncTcpServer.h"

 

///////////////////////////////////////////////////////////////////////////////////////////////////

// CAsyncTcpServer的实现

///////////////////////////////////////////////////////////////////////////////////////////////////

CAsyncTcpServer::CAsyncTcpServer(int maxClientNumber, int port)

    : m_ioservice()

    , m_work(m_ioservice)

    , m_acceptor(m_ioservice)

    , m_maxClientNumber(maxClientNumber)

    , m_clientId(0)

{

    // thread对象不能使用临时变量,否则会析构

    // io_service::run本身不创建线程,只能依托于用户创建的线程里运行

    // io_service::run有两个重载,需要显式指定使用哪个重载,这里的两个类名都不能省略

    // 可以创建线程池,实现多线程并发

    m_thread = thread((size_t(io_service::*)())&io_service::run, &m_ioservice);

 

    m_nextClient = make_shared<CTcpConnection>(m_ioservice, m_clientId);

    m_clientId++;

 

    tcp::endpoint endpoint(tcp::v4(), port);

    m_acceptor.open(endpoint.protocol());

    m_acceptor.set_option(tcp::acceptor::reuse_address(true));

    m_acceptor.bind(endpoint);

    m_acceptor.listen();

 

    // 异步等待客户端连接

    m_acceptor.async_accept(m_nextClient->m_socket, boost::bind(&CAsyncTcpServer::handle_accept, this, boost::asio::placeholders::error));

}

 

CAsyncTcpServer::~CAsyncTcpServer()

{

    for (map<int, TcpConnectionPtr>::iterator it = m_clients.begin(); it != m_clients.end(); ++it){

        it->second->m_socket.close();

    }

 

    // 让m_thread对应的线程结束,不调用这个,会一直阻塞在m_thread.join()上

    m_ioservice.stop();

 

    // 不加这一行,退出程序时会报错

    m_thread.join();

}

 

void CAsyncTcpServer::Send(int clientId, const BYTE* data, size_t length)

{

    map<int, TcpConnectionPtr>::iterator it = m_clients.find(clientId);

    if (it == m_clients.end()){

        return;

    }

    // 同步发送数据

    it->second->m_socket.write_some(boost::asio::buffer(data, length));

}

 

void CAsyncTcpServer::handle_accept(const boost::system::error_code& error)

{

    if (!error){

        // 判断连接数目是否达到最大限度

        if (m_maxClientNumber > 0 && m_clients.size() >= m_maxClientNumber){

            m_nextClient->m_socket.close();

        }

        else{

            // 发送客户端连接的消息

            for (int i = 0; i < m_EventHandlers.size(); ++i){

                m_EventHandlers[i]->ClientConnected(m_nextClient->m_clientId);

            }

 

            // 设置异步接收数据

            bind_hand_read(m_nextClient.get());

 

            // 将客户端连接放到客户表中

            m_clients.insert(make_pair(m_nextClient->m_clientId, m_nextClient));

 

            // 重置下一个客户端连接

            m_nextClient = make_shared<CTcpConnection>(m_ioservice, m_clientId);

            m_clientId++;

        }

    }

 

    // 异步等待下一个客户端连接

    m_acceptor.async_accept(m_nextClient->m_socket, boost::bind(&CAsyncTcpServer::handle_accept, this, boost::asio::placeholders::error));

}

 

void CAsyncTcpServer::bind_hand_read(CTcpConnection* client)

{

    // 客户端只要发送一个数据包,handle_read就会有响应,哪怕是1个字节

    // 客户端发送超过buffer大小的数据时,handle_read会响应前面几个满的buffer,以及最后一个不满的buffer

    client->m_socket.async_read_some(boost::asio::buffer(client->m_buffer),

        boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

 

    return;

 

    // 与client->m_socket.async_read_some行为一样,仔细对比了boost头文件里的注释,除了名字不一样,其他的一毛一样

    client->m_socket.async_receive(boost::asio::buffer(client->m_buffer),

        boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

 

    // 客户端发送数据,把buffer填满后,handle_read 才会响应

    // 当客户端发送超过buffer大小的数据时,handle_read 只会响应前面整数个数据包,余下的不满buffer大小的数据不会响应

    boost::asio::async_read(client->m_socket, boost::asio::buffer(client->m_buffer),

        boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

}

 

void CAsyncTcpServer::handle_read(CTcpConnection* client, const boost::system::error_code& error, size_t bytes_transferred)

{

    if (!error){

        // 发送收到数据的信息

        for (int i = 0; i < m_EventHandlers.size(); ++i){

            m_EventHandlers[i]->ReceiveData(client->m_clientId, client->m_buffer.data(), bytes_transferred);

        }

 

        // 执行一次bind操作,往io_service的任务队列中加入一个任务

        // 任务执行后,需要将该任务再次加入队列

        bind_hand_read(client);

    }

    else{

        // 发送客户端离线的消息

        for (int i = 0; i < m_EventHandlers.size(); ++i){

            m_EventHandlers[i]->ClientDisconnect(client->m_clientId);

        }

        m_clients.erase(client->m_clientId);

    }

}

客户端:main.cpp

#include <iostream>

#include <boost\array.hpp>

#include <boost\bind.hpp>

#include <boost\noncopyable.hpp>

#include <boost\asio.hpp>

 

using namespace boost::asio;

using namespace boost::asio::ip;

 

using namespace std;

 

int _tmain(int argc, _TCHAR* argv[])

{

    int size;

    try{

        io_service service;

        tcp::socket socket(service);

        boost::system::error_code error;

 

        tcp::endpoint endpoint(address_v4::from_string("127.0.0.1"), 10000);

 

        socket.connect(endpoint, error);

 

        while (1){

            cin >> size;

            if (size == 0){

                break;

            }

            vector<BYTE> data(size);

            for (int j = 0; j < data.size(); ++j){

                data[j] = j%128;

            }

            socket.write_some(boost::asio::buffer(data, data.size()));

        }

    }catch(exception& e){

        string s = e.what();

    }

 

    return 0;

}

 

以上是关于boost异步tcp通信技术练习的主要内容,如果未能解决你的问题,请参考以下文章

使用 Boost Asio 在 TCP 套接字上执行异步写入操作

异步tcp通信——APM.Core 服务端概述

boost::asio::ip::tcp实现网络通信的小例子

boost::asio::tcp

C++ Boost.Asio - tcp 套接字异步写入

JAVA NIO 异步TCP服务端向客户端消息群发代码教程实战