网络客户端模拟器设计

Posted

技术标签:

【中文标题】网络客户端模拟器设计【英文标题】:Network client simulator design 【发布时间】:2011-01-22 11:58:42 【问题描述】:

我正在尝试用 c++ 设计一个软件,它将使用 **UDP 协议**发送请求字节(遵循标准的**应用程序级别**协议,其要填充的字段将从文本文件中获取)。

现在这个客户端必须能够以非常高的速率发送这些请求..最多 **2000 个事务/秒** 并且如果它在指定的超时时间内也应该收到响应,否则不会收到它

我将对所有套接字使用 boost 库,但我不确定它的设计是否适用于如此高速的应用程序:(

我想我必须使用高度多线程的应用程序(将再次使用 Boost)。我对吗 ?我是否必须为每个请求创建一个单独的线程?但是我认为只有一个线程必须等待接收响应,否则如果许多线程正在等待响应,我们如何区分哪些线程请求我们得到了响应!

希望这个问题很清楚。我只需要一些关于我可能面临的设计点和可疑问题的帮助。

【问题讨论】:

我不是这个领域的专家,所以我不会发布答案,但是:不,你不一定需要多线程(尽管它可能有用),不,你当然不想每秒创建 2000 个线程。考虑使用大约 2*#CPU 内核的线程数的预分叉方法。 【参考方案1】:

目前我自己的网络客户端已经完成了一半,所以也许我可以提供一些建议和一些资源来看看。在这方面有更多的经验,希望他们能加入:)

首先,你是关于提升的。一旦你习惯了它是如何连接在一起的,boost::asio 是一个很棒的用于编写网络代码的工具包。本质上,您创建一个io_service 并调用run 执行直到所有工作完成,或runOne 执行单个IO 操作。就他们自己而言,这并没有太大帮助。当您在自己的循环中运行 runOne 时,力量来自:

boost::asio::io_service myioservice;
while(true)

    myIOService.runOne();

,或在一个(或多个)线程上运行run 函数:

boost::thread t(boost::bind(&boost::asio::io_service::run, &myIOService));

但是,值得注意的是,run 在没有工作可做时立即返回(因此您可以与该线程说再见)。正如我在 *** 上发现的那样,诀窍是确保它总是有事可做。解决方法在boost::asio::io_service::work

boost::asio::io_service::work myWork(myIOService);   // just some abstract "work"

上面的代码确保你的线程在没有任何事情发生时不会停止。我认为这是维持生命的一种手段:)

在某些时候,您会想要创建一个套接字并将其连接到某个地方。我创建了一个通用 Socket 类(并从中派生了一个文本套接字以创建缓冲输入)。我还想要一个非常类似于 C# 的基于事件的系统。我在下面为您概述了这些内容:

第一步,我们需要一种通用的方法来传递参数,因此,EventArgs

eventArgs.h

 class EventArgs : boost::noncopyable
 
 private:

 public:
  EventArgs();
  virtual ~EventArgs() = 0;
 ; // eo class EventArgs:

现在,我们需要一个人们可以订阅/取消订阅的事件类:

event.h

// STL
#include <functional>
#include <stack>

// Boost
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

 // class Event
 class Event : boost::noncopyable
 
 public:
  typedef std::function<void(const EventArgs&)> DelegateType;
  typedef boost::shared_ptr<DelegateType> DelegateDecl;

 private:
  boost::mutex m_Mutex;
  typedef std::set<DelegateDecl> DelegateSet;
  typedef std::stack<DelegateDecl> DelegateStack;
  typedef DelegateSet::const_iterator DelegateSet_cit;
  DelegateSet m_Delegates;
  DelegateStack m_ToRemove;

 public:
  Event()
  
  ; // eo ctor


  Event(Event&& _rhs) : m_Delegates(std::move(_rhs.m_Delegates))
  
  ; // eo mtor

  ~Event()
  
  ; // eo dtor

  // static methods
  static DelegateDecl bindDelegate(DelegateType _f)
  
   DelegateDecl ret(new DelegateType(_f));
   return ret;
  ; // eo bindDelegate

  // methods
  void raise(const EventArgs& _args)
  
   boost::mutex::scoped_lock lock(m_Mutex);

   // get rid of any we have to remove
   while(m_ToRemove.size())
   
    m_Delegates.erase(m_Delegates.find(m_ToRemove.top()));
    m_ToRemove.pop();
   ;

   if(m_Delegates.size())
   std::for_each(m_Delegates.begin(),
        m_Delegates.end(),
        [&_args](const DelegateDecl& _decl)  (*_decl)(_args); );
  ; // eo raise

  DelegateDecl addListener(DelegateDecl _decl)
  
   boost::mutex::scoped_lock lock(m_Mutex);
   m_Delegates.insert(_decl);
   return _decl;
  ; // eo addListener

  DelegateDecl addListener(DelegateType _f)
  
   DelegateDecl ret(bindDelegate(_f));
   return addListener(ret);
  ; // eo addListener


  void removeListener(const DelegateDecl _decl)
  
   boost::mutex::scoped_lock lock(m_Mutex);
   DelegateSet_cit cit(m_Delegates.find(_decl));
   if(cit != m_Delegates.end())
    m_ToRemove.push(_decl);
  ; // eo removeListener

  // operators

  // Only use operator += if you don't which to manually detach using removeListener
  Event& operator += (DelegateType _f)
  
   addListener(_f);
   return *this;
  ; // eo op +=

 ; // eo class Event

然后,是时候创建一个套接字类了。以下是标题:

socket.h

(一些注释:ByteVectortypedef std::vector&lt;unsigned char&gt;

#pragma once

#include "event.h"

// boost
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
  // class Socket
  class MORSE_API Socket : boost::noncopyable
  
  protected:
   typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;

  private:
   ByteVector      m_Buffer;   // will be used to read in

   SocketPtr        m_SocketPtr;
   boost::asio::ip::tcp::endpoint      m_RemoteEndPoint;
   bool         m_bConnected;

   // reader
   void _handleConnect(const boost::system::error_code& _errorCode, boost::asio::ip::tcp::resolver_iterator _rit);
   void _handleRead(const boost::system::error_code& _errorCode, std::size_t read);
  protected:

   SocketPtr socket()  return m_SocketPtr; ;
  public:
   Socket(ByteVector_sz _bufSize = 512);
   virtual ~Socket();

   // properties
   bool isConnected() const  return m_bConnected; ;
   const boost::asio::ip::tcp::endpoint& remoteEndPoint() const return m_RemoteEndPoint; ;

   // methods
   void connect(boost::asio::ip::tcp::resolver_iterator _rit);
   void connect(const String& _host, const Port _port);
   void close();

   // Events
   Event onRead;
   Event onResolve;
   Event onConnect;
   Event onClose;
  ; // eo class Socket

然后,现在实施。您会注意到它调用另一个类来执行 DNS 解析。之后我会证明这一点。还有一些EventArg-derivatives 我已经省略了。当套接字事件发生时,它们只是作为 EventArg 参数传递。

socket.cpp

#include "socket.h"


// boost
#include <boost/asio/placeholders.hpp>

namespace morse

 namespace net
 
  // ctor
  Socket::Socket(ByteVector_sz _bufSize /* = 512 */) : m_bConnected(false)
  
   m_Buffer.resize(_bufSize);
  ; // eo ctor

  // dtor
  Socket::~Socket()
  
  ; // eo dtor


  // _handleRead
  void Socket::_handleRead(const boost::system::error_code& _errorCode,
            std::size_t _read)
  
   if(!_errorCode)
   
    if(_read)
    
     onRead.raise(SocketReadEventArgs(*this, m_Buffer, _read));
     // read again
     m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
    ;
   
   else
    close();
  ; // eo _handleRead


  // _handleConnect
  void Socket::_handleConnect(const boost::system::error_code& _errorCode,
         boost::asio::ip::tcp::resolver_iterator _rit)
  
   m_bConnected = !_errorCode;
   bool _raise(false);
   if(!_errorCode)
   
    m_RemoteEndPoint = *_rit;
    _raise = true;
    m_SocketPtr->async_read_some(boost::asio::buffer(m_Buffer), boost::bind(&Socket::_handleRead, this, _1, _2));
   
   else if(++_rit != boost::asio::ip::tcp::resolver::iterator())
   
    m_SocketPtr->close();
    m_SocketPtr->async_connect(*_rit, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
   
   else
    _raise = true; // raise complete failure

   if(_raise)
    onConnect.raise(SocketConnectEventArgs(*this, _errorCode));

  ; // eo _handleConnect


  // connect
  void Socket::connect(boost::asio::ip::tcp::resolver_iterator _rit)
  
   boost::asio::ip::tcp::endpoint ep(*_rit);
   m_SocketPtr.reset(new boost::asio::ip::tcp::socket(Root::instance().ioService()));
   m_SocketPtr->async_connect(ep, boost::bind(&Socket::_handleConnect, this, boost::asio::placeholders::error, _rit));
  ;


  void Socket::connect(const String& _host, Port _port)
  
   // Anon function for resolution of the host-name and asynchronous calling of the above
   auto anonResolve = [this](const boost::system::error_code& _errorCode, 
           boost::asio::ip::tcp::resolver_iterator _epIt)
   
    // raise event
    onResolve.raise(SocketResolveEventArgs(*this, !_errorCode ? (*_epIt).host_name() : String(""), _errorCode));

    // perform connect, calling back to anonymous function
    if(!_errorCode)
     this->connect(_epIt);
   ;

   // Resolve the host calling back to anonymous function
   Root::instance().resolveHost(_host, _port, anonResolve);

  ; // eo connect


  void Socket::close()
  
   if(m_bConnected)
   
    onClose.raise(SocketCloseEventArgs(*this));
    m_SocketPtr->close();
    m_bConnected = false;
   ;
   // eo close

正如我所说的 DNS 解析,Root::instance().resolveHost(_host, _port, anonResolve); 行调用它来执行异步 DNS:

  // resolve a host asynchronously
  template<typename ResolveHandler>
  void resolveHost(const String& _host, Port _port, ResolveHandler _handler)
  
   boost::asio::ip::tcp::endpoint ret;
   boost::asio::ip::tcp::resolver::query query(_host, boost::lexical_cast<std::string>(_port));
   m_Resolver.async_resolve(query, _handler);
  ; // eo resolveHost

最后,我需要一个基于文本的套接字,它在每次收到我的行时都会引发一个事件(然后对其进行处理)。这次我将省略头文件,只显示实现文件。不用说,它声明了一个名为 onLineEvent,它会在每次收到完整的一行时触发:

// boost
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>

namespace morse

 namespace net
 
  String TextSocket::m_DefaultEOL("\r\n");

  // ctor
  TextSocket::TextSocket() : m_EOL(m_DefaultEOL)
  
   onRead += boost::bind(&TextSocket::readHandler, this, _1);
  ; // eo ctor


  // dtor
  TextSocket::~TextSocket()
  
  ; // eo dtor


  // readHandler
  void TextSocket::readHandler(const EventArgs& _args)
  
   auto& args(static_cast<const SocketReadEventArgs&>(_args));
   m_LineBuffer.append(args.buffer().begin(), args.buffer().begin() + args.bytesRead());
   String::size_type pos;
   while((pos = m_LineBuffer.find(eol())) != String::npos)
   
    onLine.raise(SocketLineEventArgs(*this, m_LineBuffer.substr(0, pos)));
    m_LineBuffer = m_LineBuffer.substr(pos + eol().length());
   ;
  ; // eo readHandler


  // writeHandler
  void TextSocket::writeHandler(const boost::system::error_code& _errorCode, std::size_t _written)
  
   if(!_errorCode)
   
    m_Queue.pop_front();
    if(!m_Queue.empty()) // more to do?
     boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
   
   else
    close();
  ; // eo writeHandler

  void TextSocket::sendLine(String _line)
  
   Root::instance().ioService().post(boost::bind(&TextSocket::_sendLine, this, _line));
  ; // eo sendLine


  // _sendLine
  void TextSocket::_sendLine(String _line)
  
   // copy'n'queue
   _line.append(m_EOL);
   m_Queue.push_back(_line);
   if(m_Queue.size() == 1) // previously an empty queue, must start write!
    boost::asio::async_write(*socket().get(), boost::asio::buffer(m_Queue.front(), m_Queue.front().length()), boost::bind(&TextSocket::writeHandler, this, _1, _2));
  ; // eo sendLine

关于上述类的一些注意事项......它使用boost::asio::post 发送行。这允许它全部发生在 ASIO 以线程安全的方式管理的线程上,并允许我们排队等待发送的行。这使得它非常可扩展。

我确信还有更多问题,也许我的代码没有帮助。我花了几天时间将它们拼凑起来并理解它,我怀疑它实际上是否有任何好处。希望一些更好的头脑会浏览它并去“HOLY CRAP,这个

【讨论】:

@ArunMu,正如 small_duck 在他的回复中所说,您可以启动多个线程(基本上,我的代码中启动 boost 线程的行?还有更多!)。我会从一个线程开始,看看性能如何,如果需要可以扩大规模。我会从一个 boost 线程开始,发布到一个处理线程并查看吞吐量是什么样的,也许会添加另一个线程和其他机制来提高性能。 @ArunMu。我有一个名为Root(惊喜!)的根对象,它既充当我的客户端类的容器(又包含上面提到的套接字)。它还包含boost::asio::io_service 实例。在构造函数中,它初始化一些工作(将该 io-service 传递给 ctor),并使用 io_servicerun 方法启动一个新线程。当 root 完成构建后,io_service 启动并运行,并且由于添加了 work,它很高兴坐在那里,直到我给它做更多事情。 (更多即将到来)。 @ArunMu,实际上我只使用互斥锁来允许从任何地方修改事件处理程序(例如,假设您收到一些数据并希望在获取时添加或删除您的处理程序。 .. 您在 boost 线程上接收数据.. 如果 UI 线程正在添加处理程序怎么办?)。在实际发送数据时,我使用boost::asio::io_service::post 方法确保在它创建的线程上调用指定的函数,因此不需要互斥体。 @ArunMu,没问题。另外,请注意,我的代码使用单个线程来执行所有网络 I/O。在您的情况下,对于多个线程,您将需要考虑在您的发布/接收处理程序上使用链,以确保单个函数调用一次不会被多个线程访问。一旦你了解了所有这些东西,这一切都是有道理的,但我承认一开始就相当令人生畏:) @ArunMu,太棒了 :) 快乐的异步编码!【参考方案2】:

我不确定您是否需要使用“繁重”的多线程。大多数高速应用程序使用操作系统的轮询机制,通常比线程更好地扩展。

架构很大程度上取决于您的应用程序需要的反应性,即哪些组件负责生成输入和输出以及进行实际处理。

使用 boost::asio 解决问题的一种方法是使用一个通信线程来运行 boost::asio::io_service::run 方法。 io_service 侦听各种 UDP 套接字,并在消息到达时对其进行处理,可能会将它们发送到队列中,以便应用程序可以在主线程中处理它们。在主线程中,您可以将消息发布到 io_services 以供主系统发送。

这应该可以让您在没有太多困难的情况下达到每秒 2000 条消息。

另一种方法是通过从多个线程多次调用 boost::asio::io_service::run 方法来启动多个通信线程,从而允许它们的通信线程并行处理消息。

不过,给 Asio 一个忠告:由于它的异步架构,如果你按照它的逻辑去使用它,它会更好地工作。如果您发现自己使用了很多锁并管理了很多线程,那么您可能做错了。仔细查看各种方法的线程安全保证,并研究提供的示例。

【讨论】:

感谢您的意见。我将尝试了解 boost asio 的精髓。如果我不需要使用线程,即使我会很高兴:).. 除了标准文档之外,我们还有其他好的资源来理解 boost asio 吗?

以上是关于网络客户端模拟器设计的主要内容,如果未能解决你的问题,请参考以下文章

如何在本地端口上模拟网络延迟?

网易考拉Android客户端网络模块设计

网易考拉Android客户端网络模块设计

从模拟令牌进行网络登录

在将本地机器人连接到本地 WebChat 客户端时获取 403(尽管适用于模拟器)

Fiddler快速入门(还有一个功能就是不经过网络,直接模拟一个响应返回给客户端)