如何实现消息机制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何实现消息机制相关的知识,希望对你有一定的参考价值。

步骤/方法
1
  BEGIN_MESSAGE_MAP(CInheritClass, CBaseClass)
  //AFX_MSG_MAP(CInheritClass)
  //AFX_MSG_MAP
  END_MESSAGE_MAP()
  这里主要进行消息映射的实现和消息处理函数的实现。
  所有能够进行消息处理的类都是基于CCmdTarget类的,也就是说CCmdTarget类是所有可以进行消息处理类的父类。CCmdTarget类是MFC处理命令消息的基础和核心。
  同时MFC定义了下面的两个主要结构:
  AFX_MSGMAP_ENTRY
  struct AFX_MSGMAP_ENTRY
  
  UINT nMessage; // windows message
  UINT nCode; // control code or WM_NOTIFY code
  UINT nID;
  // control ID (or 0 for windows messages)
  UINT nLastID;
  // used for entries specifying a range of control id\'s
  UINT nSig;
  // signature type (action) or pointer to message #
  AFX_PMSG pfn; // routine to call (or special value)
  ;
  和AFX_MSGMAP
  struct AFX_MSGMAP
  
  #ifdef _AFXDLL
  const AFX_MSGMAP* (PASCAL* pfnGetBaseMap)();
  #else
  const AFX_MSGMAP* pBaseMap;
  #endif
  const AFX_MSGMAP_ENTRY* lpEntries;
  ;
  其中AFX_MSGMAP_ENTRY结构包含了
  一个消息的所有相关信息,其中
  nMessage为Windows消息的ID号
  nCode为控制消息的通知码
  nID为Windows控制消息的ID
  nLastID表示如果是一个指定范围的消息被映射的话,
  nLastID用来表示它的范围。
  nSig表示消息的动作标识
  AFX_PMSG pfn 它实际上是一个指向
  和该消息相应的执行函数的指针。

2
  而AFX_MSGMAP主要作用是两个,一:用来得到基类的消息映射入口地址。二:得到本身的消息映射入口地址。
  实际上,MFC把所有的消息一条条填入到AFX_MSGMAP_ENTRY结构中去,形成一个数组,该数组存放了所有的消息和与它们相关的参数。同时通过 AFX_MSGMAP能得到该数组的首地址,同时得到基类的消息映射入口地址,这是为了当本身对该消息不响应的时候,就调用其基类的消息响应。
  现在我们来分析MFC是如何让窗口过程来处理消息的,实际上所有MFC的窗口类都通过钩子函数_AfxCbtFilterHook截获消息,并且在钩子函 数_AfxCbtFilterHook中把窗口过程设定为AfxWndProc。原来的窗口过程保存在成员变量m_pfnSuper中。
  所以在MFC框架下,一般一个消息的处理过程是这样的。
  函数AfxWndProc接收Windows操作系统发送的消息。
  函数AfxWndProc调用函数AfxCallWndProc进行消息处理,这里一个进步是把对句柄的操作转换成对CWnd对象的操作。
  函 数AfxCallWndProc调用CWnd类的方法WindowProc进行消息处理。注意AfxWndProc和AfxCallWndProc都是 AFX的API函数。而WindowProc已经是CWnd的一个方法。所以可以注意到在WindowProc中已经没有关于句柄或者是CWnd的参数 了。

3
  方法WindowProc调用方法OnWndMsg进行正式的消息处理,即把消息派送到相关的方法中去处理。消息是如何派送的呢? 实际上在CWnd类中都保存了一个AFX_MSGMAP的结构,而在AFX_MSGMAP结构中保存有所有我们用ClassWizard生成的消息的数组 的入口,我们把传给OnWndMsg的message和数组中的所有的message进行比较,找到匹配的那一个消息。实际上系统是通过函数 AfxFindMessageEntry来实现的。找到了那个message,实际上我们就得到一个AFX_MSGMAP_ENTRY结构,而我们在上面 已经提到AFX_MSGMAP_ENTRY保存了和该消息相关的所有信息,其中主要的是消息的动作标识和跟消息相关的执行函数。然后我们就可以根据消息的 动作标识调用相关的执行函数,而这个执行函数实际上就是通过ClassWizard在类实现中定义的一个方法。这样就把消息的处理转化到类中的一个方法的 实现上。举一个简单的例子,比如在View中对WM_LButtonDown消息的处理就转化成对如下一个方法的操作。
  void CInheritView::OnLButtonDown
  (UINT nFlags, CPoint point)
  
  // TODO: Add your message
  handler code here and/or call default
  CView::OnLButtonDown(nFlags, point);
  
  注 意这里CView::OnLButtonDown(nFlags, point)实际上就是调用CWnd的Default()方法。 而Default()方法所做的工作就是调用DefWindowProc对消息进行处理。这实际上是调用原来的窗口过程进行缺省的消息处理。
  如果OnWndMsg方法没有对消息进行处理的话,就调用DefWindowProc对消息进行处理。这是实际上是调用原来的窗口过程进行缺省的消息处理。
  所以如果正常的消息处理的话,MFC窗口类是完全脱离了原来的窗口过程,用自己的一套体系结构实现消息的映射和处理。即先调用MFC窗口类挂上去的窗口过 程,再调用原先的窗口过程。并且用户面对和消息相关的参数不再是死板的wParam和lParam,而是和消息类型具体相关的参数。比如和消息 WM_LbuttonDown相对应的方法OnLButtonDown的两个参数是nFlags和point。nFlags表示在按下鼠标左键的时候是否 有其他虚键按下,point更简单,就是表示鼠标的位置。

4
  同时MFC窗口类消息传递中还提供了两个函数,分别为WalkPreTranslateTree和PreTranslateMessage。我们知道利用 MFC框架生成的程序,都是从CWinApp开始执行的,而CWinapp实际继承了CWinThread类。在CWinThread的运行过程中会调用 窗口类中的WalkPreTranslateTree方法。而WalkPreTranslateTree方法实际上就是从当前窗口开始查找愿意进行消息翻 译的类,直到找到窗口没有父类为止。在WalkPreTranslateTree方法中调用了PreTranslateMessage方法。实际上 PreTranslateMessage最大的好处是我们在消息处理前可以在这个方法里面先做一些事情。举一个简单的例子,比如我们希望在一个CEdit 对象里,把所有的输入的字母都以大写的形式出现。我们只需要在PreTranslateMessage方法中判断message是否为WM_CHAR,如 果是的话,把wParam(表示键值)由小写字母的值该为大写字母的值就实现了这个功能。
  继续上面的例子,根据我们对MFC消息机制的分析,我们很容易得到除了上面的方法,我们至少还可以在另外两个地方进行操作。
  在消息的处理方法里面即OnChar中,当然最后我们不再调用CEdit::OnChar(nChar, nRepCnt, nFlags),而是直接调用DefWindowProc(WM_CHAR,nChar,MAKELPARAM (nRepCnt,nFlags))。因为从我们上面的分析可以知道CEdit::OnChar(nChar, nRepCnt, nFlags)实际上也就是对DefWindowProc方法的调用。
  我们可以直接重载DefWindowProc方法,对message类型等于WM_CHAR的,直接修改nChar的值即可。

  

转载仅供参考,版权属于原作者。祝你愉快,满意请采纳哦
参考技术A

消息系统对于一个win32程序来说十分重要,它是一个程序运行的动力源泉。一个消息,是系统定义的一个32位的值,他唯一的定义了一个事件,向 Windows发出一个通知,告诉应用程序某个事情发生了。例如,单击鼠标、改变窗口尺寸、按下键盘上的一个键都会使Windows发送一个消息给应用程序。
    消息本身是作为一个记录传递给应用程序的,这个记录中包含了消息的类型以及其他信息。例如,对于单击鼠标所产生的消息来说,这个记录中包含了单击鼠标时的坐标。这个记录类型叫做MSG,MSG含有来自windows应用程序消息队列的消息信息,它在Windows中声明如下:

typedef struct tagMsg

       HWND    hwnd;       //接受该消息的窗口句柄
       UINT    message;    //消息常量标识符,也就是我们通常所说的消息号
       WPARAM  wParam;     //32位消息的特定附加信息,确切含义依赖于消息值
       LPARAM  lParam;     //32位消息的特定附加信息,确切含义依赖于消息值
       DWORD   time;       //消息创建时的时间
       POINT   pt;         //消息创建时的鼠标/光标在屏幕坐标系中的位置
MSG;


    消息可以由系统或者应用程序产生。系统在发生输入事件时产生消息。举个例子, 当用户敲键, 移动鼠标或者单击控件。系统也产生消息以响应由应用程序带来的变化, 比如应用程序改变系统字体改变窗体大小。应用程序可以产生消息使窗体执行任务,或者与其他应用程序中的窗口通讯。

用redis实现消息队列(实时消费+ack机制)

消息队列

首先做简单的引入。

MQ主要是用来:

  • 解耦应用、
  • 异步化消息
  • 流量削峰填谷

目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
网上的资源对各种情况都有详细的解释,在此不做过多赘述。本文
仅介绍如何使用Redis实现轻量级MQ的过程。

为什么要用Redis实现轻量级MQ?

在业务的实现过程中,就算没有大量的流量,解耦和异步化几乎也是处处可用,此时MQ就显得尤为重要。但与此同时MQ也是一个蛮重的组件,例如我们如果用RabbitMQ就必须为它搭建一个服务器,同时如果要考虑可用性,就要为服务端建立一个集群,而且在生产如果有问题也需要查找功能。在中小型业务的开发过程中,可能业务的其他整个实现都没这个重。过重的组件服务会成倍增加工作量。
所幸的是,Redis提供的list数据结构非常适合做消息队列。
但是如何实现即时消费?如何实现ack机制?这些是实现的关键所在。

如何实现即时消费?

网上所流传的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)弹出。
让我们来看看阻塞式弹出的使用方式:

BRPOP key [key ...] timeout

此命令的说明是:

1、当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。 
2、当给定多个key参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。

另外,BRPOP 除了弹出元素的位置和 BLPOP 不同之外,其他表现一致。

以此来看,列表的阻塞式弹出有两个特点:

1、如果list中没有任务的时候,该连接将会被阻塞
2、连接的阻塞有一个超时时间,当超时时间设置为0时,即可无限等待,直到弹出消息

由此看来,此方式是可行的,但此为传统的观察者模式,业务简单则可使用,如A的任务只由B去执行。但如果A和Z的任务,B和C都能执行,那使用这种方式就相形见肘。这个时候就应该使用订阅/发布模式,使业务系统更加清晰。
好在Redis也支持Pub/Sub(发布/订阅)。在消息A入队list的同时发布(PUBLISH)消息B到频道channel,此时已经订阅channel的worker就接收到了消息B,知道了list中有消息A进入,即可循环lpop或rpop来消费list中的消息。流程如下:

技术图片

其中的worker可以是单独的线程,也可以是独立的服务,其充当了Consumer和业务处理者角色。下面做实例说明。

即时消费实例

示例场景为:worker要做同步文件功能,等到有文件生成时立马同步。

首先开启一个线程代表worker,来订阅频道channel:

  1.  
    @Service
  2.  
    public class SubscribeService {
  3.  
     
  4.  
    @Resource
  5.  
    private RedisService redisService;
  6.  
    @Resource
  7.  
    private SynListener synListener;//订阅者
  8.  
     
  9.  
    @PostConstruct
  10.  
    public void subscribe() {
  11.  
    new Thread(new Runnable() {
  12.  
     
  13.  
    @Override
  14.  
    public void run() {
  15.  
    LogCvt.info("服务已订阅频道:{}", channel);
  16.  
    redisService.subscribe(synListener, channel);
  17.  
    }
  18.  
    }).start();
  19.  
     
  20.  
    }
  21.  
    }

代码中的SynListener即为所声明的订阅者,channel为订阅的频道名称,具体的订阅逻辑如下:

  1.  
    @Service
  2.  
    public class SynListener extends JedisPubSub {
  3.  
     
  4.  
    @Resource
  5.  
    private DispatchMessageHandler dispatchMessageHandler;
  6.  
     
  7.  
    @Override
  8.  
    public void onMessage(String channel, String message) {
  9.  
    LogCvt.info("channel:{},receives message:{}",channel,message);
  10.  
    try {
  11.  
    //处理业务(同步文件)
  12.  
    dispatchMessageHandler.synFile();
  13.  
    } catch (Exception e) {
  14.  
    LogCvt.error(e.getMessage(),e);
  15.  
    }
  16.  
    }
  17.  
    }

处理业务的时候,就去list中去消费消息:

  1.  
    @Service
  2.  
    public class DispatchMessageHandler {
  3.  
     
  4.  
    @Resource
  5.  
    private RedisService redisService;
  6.  
    @Resource
  7.  
    private MessageHandler messageHandler;
  8.  
     
  9.  
    public void synFile(){
  10.  
    while(true){
  11.  
    try {
  12.  
    String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());
  13.  
    if (null == message){
  14.  
    break;
  15.  
    }
  16.  
    Thread.currentThread().setName(Tools.uuid());
  17.  
    // 队列数据处理
  18.  
    messageHandler.synfile(message);
  19.  
    } catch (Exception e) {
  20.  
    LogCvt.error(e.getMessage(),e);
  21.  
    }
  22.  
    }
  23.  
    }
  24.  
     
  25.  
    }

这样我们就达到了消息的实时消费的目的。

如何实现ack机制?

ack,即消息确认机制(Acknowledge)。

首先来看RabbitMQ的ack机制:

  • Publisher把消息通知给Consumer,如果Consumer已处理完任务,那么它将向Broker发送ACK消息,告知某条消息已被成功处理,可以从队列中移除。如果Consumer没有发送回ACK消息,那么Broker会认为消息处理失败,会将此消息及后续消息分发给其他Consumer进行处理(redeliver flag置为true)。
  • 这种确认机制和TCP/IP协议确立连接类似。不同的是,TCP/IP确立连接需要经过三次握手,而RabbitMQ只需要一次ACK。
  • 值的注意的是,RabbitMQ当且仅当检测到ACK消息未发出且Consumer的连接终止时才会将消息重新分发给其他Consumer,因此不需要担心消息处理时间过长而被重新分发的情况。

那么在我们用Redis实现消息队列的ack机制的时候该怎么做呢?
需要注意两点:

  1. work处理失败后,要回滚消息到原始pending队列
  2. 假如worker挂掉,也要回滚消息到原始pending队列

上面第一点可以在业务中完成,即失败后执行回滚消息。

实现方案

(该方案主要解决worker挂掉的情况)

  1. 维护两个队列:pending队列和doing表(hash表)。
  2. workers定义为ThreadPool。
  3. 由pending队列出队后,workers分配一个线程(单个worker)去处理消息——给目标消息append一个当前时间戳和当前线程名称,将其写入doing表,然后该worker去消费消息,完成后自行在doing表擦除信息。
  4. 启用一个定时任务,每隔一段时间去扫描doing队列,检查每隔元素的时间戳,如果超时,则由worker的ThreadPoolExecutor去检查线程是否存在,如果存在则取消当前任务执行,并把事务rollback。最后把该任务从doing队列中pop出,再重新push进pending队列。
  5. 在worker的某线程中,如果处理业务失败,则主动回滚,并把任务从doing队列中移除,重新push进pending队列。

总结

Redis作为消息队列是有很大局限性的。因为其主要特性及用途决定它只能实现轻量级的消息队列。写在最后:没有绝对好的技术,只有对业务最友好的技术,谨此献给所有developer。

以上是关于如何实现消息机制的主要内容,如果未能解决你的问题,请参考以下文章

JAVA Eclipse中如何简易的实现消息机制

现在流行的消息推送机制是怎么实现的

Android异步消息机制

07. RabbitMQ消息成功确认机制

用redis实现消息队列(实时消费+ack机制)

用redis实现消息队列(实时消费+ack机制)