ACE中TASK架构简介及简单应用

Posted 鸭子船长

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ACE中TASK架构简介及简单应用相关的知识,希望对你有一定的参考价值。

一、基础功能介绍

1、ACE_Message_Block*,Windows消息用MSG结构表示,ACE_Task中因为不能预计各种应用中消息的类型,所以ACE_Message_Block基本上可以理解为是对一个指针的封装,这个指针指向实际的一块内存或是一个对象等等。在创建ACE_Message_Block时,可以指定是由ACE_Message_Block来管理内存(构造函数中指定一个 size_t类型的大小),还是由我们自己管理内存(构造函数中指定一个指针)。而一个ACE_Message_Block类型的指针,就是一个消息,我们通过传递它来进行逻辑的业务处理。

其包含如下两个成员变量:

1 /// Misc flags (e.g., DONT_DELETE and USER_FLAGS).
2   ACE_Message_Block::Message_Flags flags_;
3 
4   /// Pointer To beginning of message payload.
5   char *base_;

其中flags_表示数据删除标志,有效取值为DONT_DELETE。对flags_标志的作用描述如下:数据指针base_的值有两种来源,一种是由应用程序传入,在这种情况下,应该将flags_设置为DONT_DELETE,告诉框架当删除ACE_Data_Block对象时,不要删除该指针,应由应用程序自己处理;第二种是该指针由框架申请和释放,应用程序无须关注。通过这样的设计可以提高数据结构在使用上的灵活性。

2、ACE_Task::putq,事实上,到底用SendMessage还是PostMessage与ACE_Task::putq来进行类比,我很为难,PostMessage发送一个消息后立刻返回,这与通常的ACE_Task::putq行为非常类似,因为ACE_Task是运行在另外一个线程上,ACE_Task::putq只是完成将消息插入到消息队列的工作,理论上它应该立刻返回,但实际上,ACE_Task的消息队列有容量大小限制,这个限制由我们自己限定,当当前消息队列满时,ACE_Task::putq将阻塞一直到可以插入,这时候就比较类似与SendMessage,

3、ACE_Task::getq,

1 ACE_Message_Block * msg;
2 while(getq(msg) != -1)    // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
3 {
4     // process msg here
5 }

4、消息处理函数,默认没有提供,svc

二、要搭架一个基于ACE_Task的消息系统,通常要做如下的步骤:

1、编写一个派生自ACE_Task的类,指定它的同步模式
ACE_Task的消息队列可以由多个处理线程共享使用,所以需要提供同步模式,例如 ACE_MT_SYNCH和ACE_NULL_SYNCH分别表示基于多线程的同步和不使用同步,这个参数是ACE_Task的一个模板参数。

其分别声明了不同的锁。

 1 /**
 2  * @class ACE_NULL_SYNCH
 3  *
 4  * @brief Implement a do nothing Synchronization wrapper that
 5  *        typedefs the @c ACE_Condition and @c ACE_Mutex to the
 6  *        @c Null* versions.
 7  */
 8 class ACE_Export ACE_NULL_SYNCH
 9 {
10 public:
11   typedef ACE_Null_Mutex MUTEX;
12   typedef ACE_Null_Mutex NULL_MUTEX;
13   typedef ACE_Null_Mutex PROCESS_MUTEX;
14   typedef ACE_Null_Mutex RECURSIVE_MUTEX;
15   typedef ACE_Null_Mutex RW_MUTEX;
16   typedef ACE_Null_Condition CONDITION;
17   typedef ACE_Null_Condition RECURSIVE_CONDITION;
18   typedef ACE_Null_Semaphore SEMAPHORE;
19   typedef ACE_Null_Mutex NULL_SEMAPHORE;
20 };
21 
22 /**
23  * @class ACE_MT_SYNCH
24  *
25  * @brief Implement a default thread safe synchronization wrapper that
26  *        typedefs the @c ACE_Condition and @c ACE_Mutex to the
27  * @c ACE_Condition and @c ACE_Mutex versions.
28  *
29  * @todo This should be a template, but SunC++ 4.0.1 complains about
30  *       this.
31  */
32 class ACE_Export ACE_MT_SYNCH
33 {
34 public:
35   typedef ACE_Thread_Mutex MUTEX;
36   typedef ACE_Null_Mutex NULL_MUTEX;
37   typedef ACE_Process_Mutex PROCESS_MUTEX;
38   typedef ACE_Recursive_Thread_Mutex RECURSIVE_MUTEX;
39   typedef ACE_RW_Thread_Mutex RW_MUTEX;
40   typedef ACE_Condition_Thread_Mutex CONDITION;
41   typedef ACE_Condition_Recursive_Thread_Mutex RECURSIVE_CONDITION;
42   typedef ACE_Thread_Semaphore SEMAPHORE;
43   typedef ACE_Null_Semaphore NULL_SEMAPHORE;
44 };

 

1 class My_Task : public ACE_Task<ACE_MT_SYNCH>
2 {
3 public:
4     virtual int svc();
5 }

2、重载 ACE_Task的 svc 方法,编写消息循环相关的代码

1 int My_Task::svc()
2 {
3     ACE_Message_Block * msg;
4     while(getq(msg) != -1)    // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
5     {
6         // process msg here
7     }
8 }

svc 方法相当与处理线程的入口方法。

3、假设 My_Task是一个基于ACE_Task的类,创建一个唯一的My_Task实例,这个可以通过typedef ACE_Singleton<MyTask, SYNCH_METHOD> MYTASK;然后总是使用MYTASK::instance方法来获取一个My_Task的指针来完成。

4、在适当位置(一般是程序开始的时候),让My_Task开始工作

MYTASK::intance()->activate(
THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , // 线程创建的属性
n_threads = 1, // 线程的数目,即有多少处理线程
...)

5、在有消息发生的时候发送消息

1 ACE_Message_Block * msg;
2 // fill the msg
3 ...
4 MYTASK::intance()->putq(msg);

三、简单示例

生产者消费者示例

  1 /*************************************************************************
  2     > File Name: task.cpp
  3     > Author: 
  4     > Mail: 
  5     > Created Time: Tue 10 Oct 2017 02:49:52 PM CST
  6  ************************************************************************/
  7 
  8 #include <ace/Synch.h>
  9 #include <ace/Task.h>
 10 #include <ace/Message_Block.h>
 11 
 12 char test_message[] = "test_message";
 13 #define MAX_MESSAGES 10
 14 class Counting_Test_Producer : public ACE_Task<ACE_MT_SYNCH>
 15 {
 16     public:
 17         Counting_Test_Producer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
 18         :ACE_Task<ACE_MT_SYNCH>(0,queue) {}
 19         virtual int svc (void);
 20 };
 21 
 22 int Counting_Test_Producer::svc (void)
 23 {
 24     int produced = 0;
 25     char data[256] = {0};
 26     ACE_Message_Block * b = 0;
 27 
 28     while(1)
 29     {
 30         ACE_OS::sprintf(data, "%s--%d.\\n", test_message, produced);
 31 
 32         //创建消息块
 33         ACE_NEW_NORETURN (b, ACE_Message_Block (256));
 34         if (b == 0)
 35         {
 36             break;
 37         }
 38         
 39         //将data中的数据复制到消息块中
 40         b->copy(data, 256);
 41         if (produced >= MAX_MESSAGES)
 42         {
 43             //如果是最后一个数据,那么将数据属性设置为MB_STOP
 44             b->msg_type(ACE_Message_Block::MB_STOP);
 45 
 46             //将消息块放入队列中
 47             if (this->putq(b) == -1)
 48             {
 49                 b->release();
 50                 break;
 51             }
 52             produced ++;
 53             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer put the data: %s.\\n"), b->base()));
 54             break;
 55         }
 56         if (this->putq(b) == -1)
 57         {
 58             b->release();
 59             break;
 60         }
 61         produced ++;
 62 
 63         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer put the data: %s.\\n"), b->base()));
 64         ACE_OS::sleep(1);
 65     }
 66     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer done\\n")));
 67     return 0;
 68 }
 69 
 70 class Counting_Test_Consumer : public ACE_Task<ACE_MT_SYNCH>
 71 {
 72     public:
 73         Counting_Test_Consumer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
 74         :ACE_Task<ACE_MT_SYNCH> (0, queue){}
 75         virtual int svc(void);
 76 };
 77 
 78 int Counting_Test_Consumer::svc(void)
 79 {
 80     int consumer = 0;
 81     ACE_Message_Block *b = 0;
 82     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("in consumer svc.\\n")));
 83     ACE_OS::sleep(30);
 84     while(1)
 85     {
 86         //循环从队列中读取数据块,如果读取失败,那么退出线程
 87         if (this->getq(b) == -1)
 88         {
 89             break;
 90         }
 91         if (b->msg_type() == ACE_Message_Block::MB_STOP)
 92         {
 93             //如果消息属性是MB_STOP,那么表示其为最后一个数据
 94             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the data: %s.\\n"), b->base()));
 95             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the stop msg.\\n")));
 96             b->release();
 97             consumer++;
 98             break;
 99         }
100         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the data: %s.\\n"), b->base()));
101         b->release();
102         consumer++;
103         ACE_OS::sleep(5);
104     }
105 
106     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer done\\n")));
107     return 0;
108 }
109 
110 int ACE_MAIN(int argc, ACE_TCHAR *argv[])
111 {
112     //创建消息队列
113     ACE_Message_Queue<ACE_MT_SYNCH> queue(2*1024*1024);
114 
115     // 创建生产者和消费者,它们使用同一个消息队列,只有这样才能实现线程间消息的传递
116     Counting_Test_Producer producer(&queue);
117     Counting_Test_Consumer consumer(&queue);
118 
119     //调用activate函数创建消费者线程
120     if (consumer.activate(THR_NEW_LWP | THR_DETACHED | THR_INHERIT_SCHED, 1) == -1)
121     {
122         ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("Consumers %p\\n"), ACE_TEXT("activate")), -1);
123     }
124 
125     //调用activate函数创建生产者线程
126     if (producer.activate ( THR_NEW_LWP | THR_DETACHED | THR_INHERIT_SCHED, 1) == -1)
127     {
128         ACE_ERROR((LM_ERROR, ACE_TEXT("Producers %p\\n"), ACE_TEXT("activate")));
129         consumer.wait();
130         return -1;
131     }
132     //调用wait函数等待线程结束
133     ACE_Thread_Manager::instance()->wait();
134     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Ending test!\\n")));
135     return 0;
136 }

问题1:

In file included from /usr/include/ace/config-macros.h:24,
                 from /usr/include/ace/config-lite.h:24,
                 from /usr/include/ace/ACE_export.h:11,
                 from /usr/include/ace/Shared_Object.h:18,
                 from /usr/include/ace/Service_Object.h:17,
                 from /usr/include/ace/Task.h:17,
                 from task.cpp:9:
/usr/include/ace/config.h:20:2: error: #error "_FILE_OFFSET_BITS != 64"

编译时添加-D_FILE_OFFSET_BITS=64

以上是关于ACE中TASK架构简介及简单应用的主要内容,如果未能解决你的问题,请参考以下文章

阿里云上海ACE同城会 | 数据库前沿技术解读及行业应用

ACE_Task::putq(转)

Ace Editor 手动添加片段

ACE代码框架总结

微服务架构的环境搭建及简单测试

阿里云新版云计算架构师ACE认证专家解读会重磅来袭