rt_thread的消息队列
Posted 旭日初扬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rt_thread的消息队列相关的知识,希望对你有一定的参考价值。
目录
前言
消息队列是一种异步通信的方式,用于线程线程,线程与中断之间的信息交换。
一、概念
- 通过消息队列服务,线程或中断服务例程可以将一条或多条消息放入消息队列中。
- 一个或多个线程可以从消息队列中获得消息。当有多个消息发送到 消息队列时,通常是将先进入消息队列的消息先传给线程,也就是说,线程先得 到的是最先进入消息队列的消息,即先进先出原则(FIFO)。
- RT-Thread 中 的消息队列支持优先级,也就是说在所有等待消息的线程中优先级最高的会先获 得消息。
- 消息支持先进先出方式排队与优先级排队方式,支持异步读写工作方式。读队列支持超时机制。
- 支持发送紧急消息,这里的紧急消息是往队列头发送消息。
- 可以允许不同长度(不超过队列节点最大值)的任意类型消息。
- 一个线程能够从任意一个消息队列接收和发送消息。
- 多个线程能够从同一个消息队列接收和发送消息。
- -当队列使用结束后,需要通过删除队列操作释放内存函数回收。消息队列可以应用于发送不定长消息的场合,包括线程与线程间的消息交换,以及在中断服务函数中给线程发送消息(中断服务例程不可能接收消息)。
- 当消息队列不再被使用时,应该删除它以释放系统资源,一旦操作完成,消 息队列将被永久性的删除。
二、实例
1、消息队列控制块
#ifdef RT_USING_MESSAGEQUEUE
/**
* message queue structure
* 消息队列控制块
*/
struct rt_messagequeue
// 内核 对象类型的成员,通过这个成员可以将消息队列挂到系统对象容器里面
struct rt_ipc_object parent; /**< inherit from ipc_object */
// 存放消息的消息池开始地址
void *msg_pool; /**< start address of message queue */
// 每条消息大小,消息队列中也就是节点的大小,单位为字节
rt_uint16_t msg_size; /**< message size of each message */
// 能够容纳的最大消息数量
rt_uint16_t max_msgs; /**< max number of messages */
// 队列中的消息索引,记录消息队列的消息个数。
rt_uint16_t entry; /**< index of messages in the queue */
// 链表头指针,指向即将读取数据的节点
void *msg_queue_head; /**< list head */
// 链表尾指针,指向允许写入数据的节点
void *msg_queue_tail; /**< list tail */
// 指向队列的空闲节点的指针
void *msg_queue_free; /**< pointer indicated the free node of queue */
;
typedef struct rt_messagequeue *rt_mq_t;
#endif
2、消息队列创建函数
/**
* This function will create a message queue object from system resource
*
* @param name the name of message queue
* @param msg_size the size of message
* @param max_msgs the maximum number of message in queue
* @param flag the flag of message queue
*
* @return the created message queue, RT_NULL on error happen
*/
rt_mq_t rt_mq_create(const char *name, // 消息队列名称
rt_size_t msg_size, // 大小
rt_size_t max_msgs, // 容量
rt_uint8_t flag)
// 结构体指针
struct rt_messagequeue *mq;
struct rt_mq_message *head;
register rt_base_t temp;
// 中断状态
RT_DEBUG_NOT_IN_INTERRUPT;
/* allocate object 为创建的消息队列分配一个消息队列的对象,并且命名对象 名称,且名称唯一 */
mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);
if (mq == RT_NULL)
return mq;
/* set parent */
// 设置消息队列的阻塞唤醒模式
// 使用 RT_IPC_FLAG_PRIO 优先级 flag 创建的 IPC 对象,在多个线程等待消息队列资源时,将由优先级高的线程优先获得资源.
// 使用 RT_IPC_FLAG_FIFO 先进先出 flag 创建的 IPC 对象,在多个线程等待消 息队列资源时,将按照先来先得的顺序获得资源
mq->parent.parent.flag = flag;
/* init ipc object */
// 获取消息队列大小
// 初始化一个链表,用于记 录访问此队列而阻塞的线程,通过这个链表,可以找到对应的阻塞线程的控制块, 从而能恢复线程
rt_ipc_object_init(&(mq->parent));
/* init message queue */
/* get correct message size */
// 设置消息队列的大小与容量
mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
mq->max_msgs = max_msgs;
/* allocate message pool */
// 给此消息队列分配内存。这块内存的大小为[消息大小+消息 头大小]与消息队列容量的乘积,每个消息节点中都有一个消息头,用于链表链 接,指向下一个消息节点,作为消息的排序
mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs);
if (mq->msg_pool == RT_NULL)
rt_mq_delete(mq);
return RT_NULL;
/* init message list */
mq->msg_queue_head = RT_NULL;
mq->msg_queue_tail = RT_NULL;
/* init message empty list */
mq->msg_queue_free = RT_NULL;
for (temp = 0; temp < mq->max_msgs; temp ++) // 将所有的消息队列的节点连接起来,形成空闲链表
head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
temp * (mq->msg_size + sizeof(struct rt_mq_message)));
head->next = mq->msg_queue_free;
mq->msg_queue_free = head;
/* the initial entry is zero */
// 消息队列清0
mq->entry = 0;
return mq;
RTM_EXPORT(rt_mq_create);
2.1、创建消息队列
// 定义消息队列控制块
static rt_mq_t test_mq = RT_NULL;
test_mq = rt_mq_create(
"test_mq" // 消息队列名称
64 // 消息队列的最低长度
30 // 消息队列的最大容量
RT_IPC_FLAG_FIFO // 队列模式
);
// 使用 RT_IPC_FLAG_PRIO 优先级 flag 创建的 IPC 对象,在多个线程等待消息队列资源时,将由优先级高的线程优先获得资源
// 使用 RT_IPC_FLAG_FIFO 先进先出 flag 创建的 IPC 对象,在多个线程等待消 息队列资源时,将按照先来先得的顺序获得资源
3、消息队列的删除函数
/**
* This function will delete a message queue object and release the memory
*
* @param mq the message queue object
*
* @return the error code
*/
rt_err_t rt_mq_delete(rt_mq_t mq)
RT_DEBUG_NOT_IN_INTERRUPT;
/* parameter check */
// 检查消息队列是否存在
RT_ASSERT(mq != RT_NULL);
/* resume all suspended thread */
// 恢复所有因为访问此队列而阻塞的线程,线程得到队列返回的错误代码
rt_ipc_list_resume_all(&(mq->parent.suspend_thread));
// 消息队列对象属于应用程序模块 ,此处不使用
#if defined(RT_USING_MODULE) && defined(RT_USING_SLAB)
/* the mq object belongs to an application module */
if (mq->parent.parent.flag & RT_OBJECT_FLAG_MODULE)
rt_module_free(mq->parent.parent.module_id, mq->msg_pool);
else
#endif
/* free message queue pool */
// 释放消息内存池
RT_KERNEL_FREE(mq->msg_pool);
/* delete message queue object */
// 删除消息队列
rt_object_delete(&(mq->parent.parent));
return RT_EOK;
RTM_EXPORT(rt_mq_delete);
3.1、删除消息队列
static rt_mq_t test_mq = RT_NULL;
rt_err_t del_mq = RT_EOK;
del_mq = rt_mq_delete(test_mq);
if (RT_EOK == del_mq)
rt_kprintf("消息队列已删除!\\n\\n");
4、发送消息列表函数
/**
* This function will send a message to message queue object, if there are
* threads suspended on message queue object, it will be waked up.\\
这个函数将向消息队列对象发送消息,如果有的话线程挂起消息队列对象,它将被唤醒
*
* @param mq the message queue object
* @param buffer the message
* @param size the size of buffer
*
* @return the error code
*/
// void *buffer 是即将发送消息的存储地址;rt_size_t size 是即将发送消息的大小
rt_err_t rt_mq_send(rt_mq_t mq, void *buffer, rt_size_t size)
register rt_ubase_t temp;
struct rt_mq_message *msg;
// 检测传递进来的参数,如果这些参数之中有一个是无效的, 都无法发送消息。
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(buffer != RT_NULL);
RT_ASSERT(size != 0);
/* greater than one message size */
// 判断消息的大小,其大小不能超过创建时候设置的消息队列 的大小 mq->msg_size,用户可以自定义大小的,如果 mq->msg_size 不够,可 以在创建时候设置大一些。
if (size > mq->msg_size)
return -RT_ERROR;
// 钩子函数调用
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
// 获取一个空闲链表指针,必须有一个空闲链表节点用于存放 要发送的消息。如果消息队列已经满了,则无法发送消息。
/* get a free list, there must be an empty item */
msg = (struct rt_mq_message *)mq->msg_queue_free;
/* message queue is full */
if (msg == RT_NULL)
/* enable interrupt */
// 使能中断
rt_hw_interrupt_enable(temp);
return -RT_EFULL;
/* move free list pointer */
// 移动空闲列表指针
mq->msg_queue_free = msg->next;
/* enable interrupt */
// 使能中断
rt_hw_interrupt_enable(temp);
/* the msg is the new tailer of list, the next shall be NULL */
// msg列表下个节点为空
msg->next = RT_NULL;
/* copy buffer */
// 拷贝数据,将即将发送的数据拷贝到空闲链表的节点中,因 为空闲节点有消息头,所以其真正存放消息的地址是 msg + 1。
rt_memcpy(msg + 1, buffer, size);
/* disable interrupt */
// 失能中断
temp = rt_hw_interrupt_disable();
/* link msg to message queue */
// 将空闲队列的消息挂载到消息队列尾部,如果此时消息队列 已经有消息,也就是尾部链表不为空,那么就直接将发送的消息挂载到尾部链表 后面。
if (mq->msg_queue_tail != RT_NULL)
/* if the tail exists, */
((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;
/* set new tail */
// 重置消息队列尾链表指针,指向当前发送的消息,无论当前 消息队列中尾链表是否有消息,都需要重置尾链表指针的指向
mq->msg_queue_tail = msg;
// 如果连头链表是空的,就需要设置头部链表指针指向当前要 发送的消息,也就是指向消息自身。
/* if the head is empty, set head */
if (mq->msg_queue_head == RT_NULL)
mq->msg_queue_head = msg;
/* increase message entry */
// 记录当前消息队列的消息个数,自加 1
mq->entry ++;
/* resume suspended thread */
// 恢复挂起线程。如果当前有线程因为访问队列而进入阻塞, 现在有消息了则可以将该线程从阻塞中恢复
if (!rt_list_isempty(&mq->parent.suspend_thread))
rt_ipc_list_resume(&(mq->parent.suspend_thread));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
rt_schedule();
return RT_EOK;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return RT_EOK;
RTM_EXPORT(rt_mq_send);
4.1、发送消息线程
//发送消息线程
/*发送消息时,发送者需指定发送到的消息队列的对象句柄(即指向消息队列 控制块的指针),并且指定发送的消息内容以及消息大小,在发送一个普通消息 之后,空闲消息链表上的消息被转移到了消息队列尾链表上*/
static void send_msg_thread_entry(void* parameter)
u8 key=0;
rt_err_t erflag=0;
u16 msg_value1=1;
u16 msg_value2=2;
while(1)
key=KEY_Scan(0);
if(key==KEY_UP_PRESS)
// 消息队列对象 消息缓冲区 消息缓冲区大小
erflag=rt_mq_send(test_mq,&msg_value1,sizeof(msg_value1));
if(erflag!=RT_EOK) // 发送消息出错
rt_kprintf("数据不能发送到消息队列!错误代码:0X%lx\\r\\n",erflag);
else if(key==KEY1_PRESS)
erflag=rt_mq_send(test_mq,&msg_value2,sizeof(msg_value2));
if(erflag!=RT_EOK)
rt_kprintf("数据不能发送到消息队列!错误代码:0X%lx\\r\\n",erflag);
rt_thread_delay(20); /* 延时20个tick */
4.2、创建发送消息线程
//创建发送消息线程
send_msg_thread =rt_thread_create(
"send_msg", /* 线程名字 */
send_msg_thread_entry, /* 线程入口函数 */
RT_NULL, /* 线程入口函数参数 */
512, /* 线程栈大小 */
2, /* 线程的优先级 */
20); /* 线程时间片 */
/* 启动线程,开启调度 */
if(send_msg_thread != RT_NULL)
rt_thread_startup(send_msg_thread);
else
return -1;
五、接收消息
/**
* This function will receive a message from message queue object, if there is
* no message in message queue object, the thread shall wait for a specified
* time.
*
* @param mq the message queue object
* @param buffer the received message will be saved in
* @param size the size of buffer
* @param timeout the waiting time
*
* @return the error code
*/
rt_err_t rt_mq_recv(rt_mq_t mq, // 消息队列的句柄/消息控制块/消息队列结构体
void *buffer, // buffer 是用于接收消息的数据存储地址,必须在接收之前就定义,确保地址有效
rt_size_t size, // 消息大小
rt_int32_t timeout) // 超时时间
// 线程
struct rt_thread *thread;
// 寄存器变量
register rt_ubase_t temp;
// 消息队列
struct rt_mq_message *msg;
rt_uint32_t tick_delta;
// 检查形参是否有效,有效才进行消息队列的数据 读取
RT_ASSERT(mq != RT_NULL);
RT_ASSERT(buffer != RT_NULL);
RT_ASSERT(size != 0);
/* initialize delta tick */
// 初始化参数
tick_delta = 0;
/* get current thread */
// 获取当前线程
thread = rt_thread_self();
// 调用钩子函数
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent)));
/* disable interrupt */
// 失能中断
temp = rt_hw_interrupt_disable();
/* for non-blocking call */
//如果当前消息队列中没有消息并且设置了不等待,则立即返 回错误代码。
if (mq->entry == 0 && timeout == 0)
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
/* message queue is empty */
// 非阻塞情况
// 如果消息队列为空,但是用户设置了等待时间,则进入循环 中
while (mq->entry == 0)
RT_DEBUG_IN_THREAD_CONTEXT;
/* reset error number in thread */
// 重置线程中的错误码
thread->error = RT_EOK;
/* no waiting, return timeout */
// 不等待
if (timeout == 0)
/* enable interrupt */
rt_hw_interrupt_enable(temp);
thread->error = -RT_ETIMEOUT;
return -RT_ETIMEOUT;
/* suspend current thread */
// 挂起当前线程,因为当前线程是由于消息队列为空,并且用 户设置了超时时间,直接将当前线程挂起,进入阻塞状态
rt_ipc_list_suspend(&(mq->parent.suspend_thread),
thread,
mq->parent.parent.flag);
/* has waiting time, start thread timer */
//用户有设置等待时间,需要启动线程计时器,并且调用 rt_tick_get()函数获取当前系统 systick 时间
if (timeout > 0)
/* get the start tick of timer */
tick_delta = rt_tick_get();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\\n",
thread->name));
/* reset the timeout of thread timer and start it */
// 重置线程计时器的超时并启动它,调用 rt_timer_control() 函数改变当前线程阻塞时间,阻塞的时间根据用户自定义的 timeout 设置,并 且调用 rt_timer_start() 函数开始定时
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&timeout);
rt_timer_start(&(thread->thread_timer));
/* enable interrupt */
// 使能中断
rt_hw_interrupt_enable(temp);
/* re-schedule */
// 发起一次线程调度。当前线程都已经挂起了,需要进行线程 切换
rt_schedule();
/* recv message */
// 线程错误
if (thread->error != RT_EOK)
/* return error */
return thread->error;
/* disable interrupt */
// 失能中断
temp = rt_hw_interrupt_disable();
/* if it's not waiting forever and then re-calculate timeout tick */
// 如果它不是永远等待,然后重新计算超时节拍
if (timeout > 0)
tick_delta = rt_tick_get() - tick_delta;
timeout -= tick_delta;
if (timeout < 0)
timeout = 0;
/* get message from queue */
// 获取列表消息 如果当前消息队列中有消息,那么获取消息队列的线程可以 直接从消息队列的 msg_queue_head 链表获取到消息,并不会进入阻塞态中。
msg = (struct rt_mq_message *)mq->msg_queue_head;
/* move message queue head */
// 移动消息队列头链表指针。重置消息队列的 msg_queue_head 指向当前消息的下一个消息。因为当前的消息被取走了,下一个消息才是可获取 的有效消息。
mq->msg_queue_head = msg->next;
/* reach queue tail, set to NULL */
// :如果到达队列尾部,则将消息队列的 msg_queue_tail 设置 为 NULL
if (mq->msg_queue_tail == msg)
mq->msg_queue_tail = RT_NULL;
/* decrease message entry */
// 记录当前消息队列中消息的个数,entry 减一,消息就是获 取了一个就少一个
mq->entry --;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* copy message */
// 拷贝消息到指定存储地址 buffer,拷贝消息的大小为 size, 其大小最大不能超过创建消息队列时候已经定义的消息大小 msg_size
rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* put message to free list */
// 获取一个消息后,消息队列上的头链表消息被转移到空闲消 息链表中,相当于消息的删除操作,这样子可以保证消息队列的循环利用,而不会 导致头链表指针移动到队列尾部时没有可用的消息节点。
msg->next = (struct rt_mq_message *)mq->msg_queue_free;
mq->msg_queue_free = msg;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
// 调用内核对象绑定的钩子函数 函数指针 函数参数
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));
return RT_EOK;
RTM_EXPORT(rt_mq_recv);
5.1、接受消息函数
**
* IPC flags and control command definitions
*/
#define RT_IPC_FLAG_FIFO 0x00 /**< FIFOed IPC. @ref IPC. */
#define RT_IPC_FLAG_PRIO 0x01 /**< PRIOed IPC. @ref IPC. */
#define RT_IPC_CMD_UNKNOWN 0x00 /**< unknown IPC command */
#define RT_IPC_CMD_RESET 0x01 /**< reset IPC object */
#define RT_WAITING_FOREVER -1 /**< Block forever until get resource. */
#define RT_WAITING_NO 0 /**< Non-block. */
/* 定义消息队列控制块 */
static rt_mq_t test_mq = RT_NULL;
//接收消息线程
static void rec_msg_thread_entry(void* parameter)
rt_err_t erflag=0;
u16 rec_msg_value=0;
while(1)
erflag=rt_mq_recv(test_mq,&rec_msg_value,sizeof(rec_msg_value),RT_WAITING_FOREVER);//一直等待
if(erflag==RT_EOK)
rt_kprintf("本次接收的数据是:%d\\r\\n",rec_msg_value);
else
rt_kprintf("数据接收错误!错误代码:0X%lx\\r\\n",erflag);
rt_thread_delay(20); /* 延时20个tick */
5.2、创建接收消息线程
//创建接收消息线程
rec_msg_thread =rt_thread_create(
"rec_msg", /* 线程名字 */
rec_msg_thread_entry, /* 线程入口函数 */
RT_NULL, /* 线程入口函数参数 */
512, /* 线程栈大小 */
3, /* 线程的优先级 */
20); /* 线程时间片 */
/* 启动线程,开启调度 */
if(rec_msg_thread != RT_NULL)
rt_thread_startup(rec_msg_thread);
else
return -1;
以上是关于rt_thread的消息队列的主要内容,如果未能解决你的问题,请参考以下文章