C++ 中的高效消息工厂和处理程序

Posted

技术标签:

【中文标题】C++ 中的高效消息工厂和处理程序【英文标题】:Efficient message factory and handler in C++ 【发布时间】:2016-10-25 02:45:13 【问题描述】:

我们公司正在用 C++11 重写大部分遗留的 C 代码。 (这也意味着我是一名学习 C++ 的 C 程序员)。我需要有关消息处理程序的建议。

我们有分布式系统 - 服务器进程通过 TCP 向客户端进程发送打包消息。

在 C 代码中这样做: - 根据类型和子类型解析消息,它们始终是前 2 个字段

- call a handler as handler[type](Message *msg)

- handler creates temporary struct say, tmp_struct to hold the parsed values and .. 

- calls subhandler[type][subtype](tmp_struct)

每个类型/子类型只有一个处理程序。

迁移到 C++11 和多线程环境。我的基本想法是-

1) 为每个类型/子类型组合注册一个处理器对象。这是 实际上是向量的向量- 向量

class MsgProcessor 

    // Factory function
    virtual Message *create();
    virtual Handler(Message *msg)

这将被不同的消息处理器继承

class AMsgProcessor : public MsgProcessor 

      Message *create() override();
      handler(Message *msg);

2) 通过查找向量向量来获取处理器。 使用重载的 create() 工厂函数获取消息。 这样我们就可以将实际消息和解析后的值保留在消息中。

3) 现在有点hack,这个消息应该被发送到其他线程进行繁重的处理。为了避免再次在向量中查找,在消息中添加了一个指向 proc 的指针。

class Message 
    const MsgProcessor *proc; // set to processor, 
                              // which we got from the first lookup
                              // to get factory function.
;

所以其他线程,就这样吧

Message->proc->Handler(Message *);

这看起来很糟糕,但希望这将有助于将消息处理程序与工厂分开。这适用于多个类型/子类型想要创建相同的消息,但处理方式不同的情况。

我正在搜索这个并遇到了:

http://www.drdobbs.com/cpp/message-handling-without-dependencies/184429055?pgno=1

它提供了一种将消息与处理程序完全分离的方法。但我想知道我上面的简单方案是否会被认为是可接受的设计。这也是实现我想要的错误的方法吗?

与速度一样,效率是此应用程序最重要的要求。我们已经在做几个内存 Jumbs => 2 个向量 + 虚函数调用来创建消息。对处理程序有两个尊重,我猜从缓存的角度来看这不是很好。

【问题讨论】:

无意冒犯 我认为您没有清楚地描述您的用例然后这么快就跳到您的设计......至少您可能需要解决对什么样的负载的性能期望?你介意写一篇关于这个的简介吗? 我不知道你说的负载是什么意思。在我们的系统中,我们必须每秒处理 200k 条消息。无需深入了解实际的产品细节。服务器向客户端发送简短的配置消息。配置消息带有类型、子类型和数据。有一个很大的数字。类型/子类型组合 - 说大约 1000。这些由不同的编号消耗。的模块。因此,为所有模块注册和接收消息提供一个干净的接口非常重要。 你希望什么时候调用'create'方法?返回的 'Message*' 是什么?您将使用已解析的数据填充它会是一条空消息吗? 【参考方案1】:

虽然您的要求不清楚,但我想我的设计可能是您正在寻找的。​​p>

查看http://coliru.stacked-crooked.com/a/f7f9d5e7d57e6261 了解完整示例。

它有以下组件:

    消息处理器的接口类IMessageProcessor。 表示消息的基类。 Message 一个注册类,它本质上是一个单例,用于存储对应于 (Type, Subtype) 对的消息处理器。 Registrator。它将映射存储在unordered_map 中。您也可以稍微调整一下以获得更好的性能。 Registrator 的所有公开 API 都受 std::mutex 保护。 MessageProcessor 的具体实现。 AMsgProcessorBMsgProcessor 在这种情况下。 simulate 函数显示它们是如何组合在一起的。

在这里也粘贴代码:

/*
 * http://***.com/questions/40230555/efficient-message-factory-and-handler-in-c
 */

#include <iostream>
#include <vector>
#include <tuple>
#include <mutex>
#include <memory>
#include <cassert>
#include <unordered_map>

class Message;

class IMessageProcessor

public:
  virtual Message* create() = 0;
  virtual void handle_message(Message*) = 0;
  virtual ~IMessageProcessor() ;
;

/*
 * Base message class
 */
class Message

public:
  virtual void populate() = 0;
  virtual ~Message() ;
;

using Type = int;
using SubType = int;
using TypeCombo = std::pair<Type, SubType>;
using IMsgProcUptr = std::unique_ptr<IMessageProcessor>;

/*
 * Registrator class maintains all the registrations in an
 * unordered_map.
 * This class owns the MessageProcessor instance inside the
 * unordered_map.
 */
class Registrator

public:
  static Registrator* instance();

  // Diable other types of construction
  Registrator(const Registrator&) = delete;
  void operator=(const Registrator&) = delete;

public:
  // TypeCombo assumed to be cheap to copy
  template <typename ProcT, typename... Args>
  std::pair<bool, IMsgProcUptr> register_proc(TypeCombo typ, Args&&... args)
  
    auto proc = std::make_unique<ProcT>(std::forward<Args>(args)...);
    bool ok;
    
      std::lock_guard<std::mutex> _(lock_);
      std::tie(std::ignore, ok) = registrations_.insert(std::make_pair(typ, std::move(proc)));
    
    return (ok == true) ? std::make_pair(true, nullptr) : 
                          // Return the heap allocated instance back
                          // to the caller if the insert failed.
                          // The caller now owns the Processor
                          std::make_pair(false, std::move(proc));
  

  // Get the processor corresponding to TypeCombo
  // IMessageProcessor passed is non-owning pointer
  // i.e the caller SHOULD not delete it or own it
  std::pair<bool, IMessageProcessor*> processor(TypeCombo typ)
  
    std::lock_guard<std::mutex> _(lock_);

    auto fitr = registrations_.find(typ);
    if (fitr == registrations_.end()) 
      return std::make_pair(false, nullptr);
    
    return std::make_pair(true, fitr->second.get());
  

  // TypeCombo assumed to be cheap to copy
  bool is_type_used(TypeCombo typ)
  
    std::lock_guard<std::mutex> _(lock_);
    return registrations_.find(typ) != registrations_.end();
  

  bool deregister_proc(TypeCombo typ)
  
    std::lock_guard<std::mutex> _(lock_);
    return registrations_.erase(typ) == 1;
  

private:
  Registrator() = default;

private:
  std::mutex lock_;
  /*
   * Should be replaced with a concurrent map if at all this
   * data structure is the main contention point (which I find
   * very unlikely).
   */
  struct HashTypeCombo
  
  public:
    std::size_t operator()(const TypeCombo& typ) const noexcept
    
      return std::hash<decltype(typ.first)>()(typ.first) ^ 
             std::hash<decltype(typ.second)>()(typ.second);
    
  ;

  std::unordered_map<TypeCombo, IMsgProcUptr, HashTypeCombo> registrations_;
;

Registrator* Registrator::instance()

  static Registrator inst;
  return &inst;
  /*
   * OR some other DCLP based instance creation
   * if lifetime or creation of static is an issue
   */



// Define some message processors

class AMsgProcessor final : public IMessageProcessor

public:
  class AMsg final : public Message 
  
  public:
    void populate() override 
      std::cout << "Working on AMsg\n";
    

    AMsg() = default;
    ~AMsg() = default;
  ;

  Message* create() override
  
    std::unique_ptr<AMsg> ptr(new AMsg);
    return ptr.release();
  

  void handle_message(Message* msg) override
  
    assert (msg);
    auto my_msg = static_cast<AMsg*>(msg);

    //.... process my_msg ?
    //.. probably being called in some other thread
    // Who owns the msg ??
    (void)my_msg; // only for suppressing warning

    delete my_msg;

    return;
  

  ~AMsgProcessor();
;

AMsgProcessor::~AMsgProcessor()



class BMsgProcessor final : public IMessageProcessor

public:
  class BMsg final : public Message
  
  public:
    void populate() override 
      std::cout << "Working on BMsg\n";
    

    BMsg() = default;
    ~BMsg() = default;
  ;

  Message* create() override
  
    std::unique_ptr<BMsg> ptr(new BMsg);
    return ptr.release();
  

  void handle_message(Message* msg) override
  
    assert (msg);
    auto my_msg = static_cast<BMsg*>(msg);

    //.... process my_msg ?
    //.. probably being called in some other thread
    //Who owns the msg ??
    (void)my_msg; // only for suppressing warning

    delete my_msg;

    return;
  

  ~BMsgProcessor();
;

BMsgProcessor::~BMsgProcessor()




TypeCombo read_from_network()

  return 1, 2;



struct ParsedData 
;

Message* populate_message(Message* msg, ParsedData& pdata)

  // Do something with the message
  // Calling a dummy populate method now
  msg->populate();
  (void)pdata;
  return msg;


void simulate()

  TypeCombo typ = read_from_network();
  bool ok;
  IMessageProcessor* proc = nullptr;

  std::tie(ok, proc) = Registrator::instance()->processor(typ);
  if (!ok) 
    std::cerr << "FATAL!!!" << std::endl;
    return;
  

  ParsedData parsed_data;
  //..... populate parsed_data here ....

  proc->handle_message(populate_message(proc->create(), parsed_data));
  return;



int main() 

  /*
   * TODO: Not making use or checking the return types after calling register
   * its a must in production code!!
   */
  // Register AMsgProcessor
  Registrator::instance()->register_proc<AMsgProcessor>(std::make_pair(1, 1));
  Registrator::instance()->register_proc<BMsgProcessor>(std::make_pair(1, 2));

  simulate();

  return 0;

更新 1

这里造成混乱的主要原因似乎是因为偶数系统的架构是未知的。

任何自尊的事件系统架构都如下所示:

    轮询套接字描述符的线程池。 用于处理定时器相关事件的线程池。 执行长阻塞作业的线程数量(取决于应用程序)相对较少。

所以,在你的情况下:

    您将在执行epoll_waitselectpoll 的线程上获得网络事件。 完全读取数据包并使用Registrator::get_processor 调用获取处理器。 注意get_processor 调用可以在没有任何锁定的情况下进行,前提是可以保证底层的unordered_map 不会被修改,即一旦我们开始接收事件就不会进行新的插入。 使用获得的处理器,我们可以获得Message 并填充它。 现在,这部分我不太确定您希望它是怎样的。此时,我们有processor,您可以在其上调用handle_message,或者从当前线程(即正在执行epoll_wait 的线程)或通过将作业(处理器和消息)发布到该线程来将其分派到另一个线程接收队列。

【讨论】:

感谢您的回答。我有一个指向处理器的指针的原因是,我在问题中提到了这一点,我们需要在不同的线程中调用处理程序。收到并解析后的消息,发送到其他线程进行处理。该处理非常耗时,因为它涉及大量 PCI 写入等。我们可以设计所有线程接收消息并处理它,但这需要某种锁定。因此,我们将其设计为管道。您认为在消息中保留指向 proc 的指针可以吗? 这会产生某种循环依赖,处理器应该看到消息,而消息反过来应该看到处理器。可能通过前向声明解决。在我的原始代码中,我让每个模块都实例化处理器。处理器的基类,比如 IMessageProcessor,将自身添加到 Registrator。注册/注销将通过新/删除来完成。这样我就不必处理派生类上的构造函数参数。您通过使用可变参数模板参数解决了它。 我认为我的方案的缺点是注册错误必须作为异常而不是返回类型来处理。你对这两个方案有意见吗? @user361190 我已经更新了答案,因为在这里评论它令人沮丧。 感谢 cmets!

以上是关于C++ 中的高效消息工厂和处理程序的主要内容,如果未能解决你的问题,请参考以下文章

讨论:php 中的高效实时聊天应用程序? [关闭]

SAP CRM 高效调试方法

Easy3D:一个轻量级易用高效的C++库,用于处理和渲染3D数据

cinatra--一个高效易用的c++ http框架

C++异常处理:掌握高效健壮代码的秘密武器

硬件高效的OpenGL