rt_thread的消息队列

Posted 旭日初扬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rt_thread的消息队列相关的知识,希望对你有一定的参考价值。

目录

前言

一、概念

 二、实例

1、消息队列控制块

2、消息队列创建函数

2.1、创建消息队列

3、消息队列的删除函数

3.1、删除消息队列

4、发送消息列表函数

4.1、发送消息线程

4.2、创建发送消息线程

五、接收消息

5.1、接受消息函数

5.2、创建接收消息线程

总结


前言

消息队列是一种异步通信的方式,用于线程线程,线程与中断之间的信息交换。

一、概念

  • 通过消息队列服务,线程或中断服务例程可以将一条或多条消息放入消息队列中。
  • 一个或多个线程可以从消息队列中获得消息。当有多个消息发送到 消息队列时,通常是将先进入消息队列的消息先传给线程,也就是说,线程先得 到的是最先进入消息队列的消息,即先进先出原则(FIFO)。
  • RT-Thread 中 的消息队列支持优先级,也就是说在所有等待消息的线程中优先级最高的会先获 得消息。
  • 消息支持先进先出方式排队与优先级排队方式,支持异步读写工作方式。读队列支持超时机制。
  • 支持发送紧急消息,这里的紧急消息是往队列头发送消息。
  • 可以允许不同长度(不超过队列节点最大值)的任意类型消息。
  • 一个线程能够从任意一个消息队列接收和发送消息。
  • 多个线程能够从同一个消息队列接收和发送消息。
  • -当队列使用结束后,需要通过删除队列操作释放内存函数回收。消息队列可以应用于发送不定长消息的场合,包括线程与线程间的消息交换,以及在中断服务函数中给线程发送消息(中断服务例程不可能接收消息)。

  • 当消息队列不再被使用时,应该删除它以释放系统资源,一旦操作完成,消 息队列将被永久性的删除。

 二、实例

1、消息队列控制块

#ifdef RT_USING_MESSAGEQUEUE
/**
 * message queue structure
 * 消息队列控制块
 */
struct rt_messagequeue
   
	//  内核 对象类型的成员,通过这个成员可以将消息队列挂到系统对象容器里面
    struct rt_ipc_object parent;                        /**< inherit from ipc_object */
    //  存放消息的消息池开始地址
    void                *msg_pool;                      /**< start address of message queue */
    //  每条消息大小,消息队列中也就是节点的大小,单位为字节
    rt_uint16_t          msg_size;                      /**< message size of each message */
	  //  能够容纳的最大消息数量
    rt_uint16_t          max_msgs;                      /**< max number of messages */
    // 队列中的消息索引,记录消息队列的消息个数。
    rt_uint16_t          entry;                         /**< index of messages in the queue */
   // 链表头指针,指向即将读取数据的节点
    void                *msg_queue_head;                /**< list head */
	//  链表尾指针,指向允许写入数据的节点
    void                *msg_queue_tail;                /**< list tail */
	// 指向队列的空闲节点的指针
    void                *msg_queue_free;                /**< pointer indicated the free node of queue */
;
typedef struct rt_messagequeue *rt_mq_t;
#endif

2、消息队列创建函数

/**
 * This function will create a message queue object from system resource
 *
 * @param name the name of message queue
 * @param msg_size the size of message
 * @param max_msgs the maximum number of message in queue
 * @param flag the flag of message queue
 *
 * @return the created message queue, RT_NULL on error happen
 */
rt_mq_t rt_mq_create(const char *name,      //  消息队列名称
                     rt_size_t   msg_size,  //  大小
                     rt_size_t   max_msgs,  //  容量
                     rt_uint8_t  flag)

	  //  结构体指针
    struct rt_messagequeue *mq;
    struct rt_mq_message *head;
    register rt_base_t temp;
    
	  //  中断状态
    RT_DEBUG_NOT_IN_INTERRUPT;

    /* allocate object  为创建的消息队列分配一个消息队列的对象,并且命名对象 名称,且名称唯一 */
    mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);
    if (mq == RT_NULL)
        return mq;

    /* set parent */
		// 设置消息队列的阻塞唤醒模式
		// 使用 RT_IPC_FLAG_PRIO 优先级 flag 创建的 IPC 对象,在多个线程等待消息队列资源时,将由优先级高的线程优先获得资源.
		// 使用 RT_IPC_FLAG_FIFO 先进先出 flag 创建的 IPC 对象,在多个线程等待消 息队列资源时,将按照先来先得的顺序获得资源
    mq->parent.parent.flag = flag;

    /* init ipc object */

		//  获取消息队列大小
		// 初始化一个链表,用于记 录访问此队列而阻塞的线程,通过这个链表,可以找到对应的阻塞线程的控制块, 从而能恢复线程
    rt_ipc_object_init(&(mq->parent));

    /* init message queue */

    /* get correct message size */
		
			//  设置消息队列的大小与容量
    mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
    mq->max_msgs = max_msgs;

    /* allocate message pool */
	
		//  给此消息队列分配内存。这块内存的大小为[消息大小+消息 头大小]与消息队列容量的乘积,每个消息节点中都有一个消息头,用于链表链 接,指向下一个消息节点,作为消息的排序
    mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs);
    if (mq->msg_pool == RT_NULL)
    
        rt_mq_delete(mq);

        return RT_NULL;
    

    /* init message list */
    mq->msg_queue_head = RT_NULL;
    mq->msg_queue_tail = RT_NULL;

    /* init message empty list */
    mq->msg_queue_free = RT_NULL;
    for (temp = 0; temp < mq->max_msgs; temp ++)  // 将所有的消息队列的节点连接起来,形成空闲链表
    
        head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
                                        temp * (mq->msg_size + sizeof(struct rt_mq_message)));
        head->next = mq->msg_queue_free;
        mq->msg_queue_free = head;
    

    /* the initial entry is zero */
		// 消息队列清0
    mq->entry = 0;

    return mq;

RTM_EXPORT(rt_mq_create);

2.1、创建消息队列

//  定义消息队列控制块
static rt_mq_t test_mq = RT_NULL;

test_mq = rt_mq_create(
"test_mq"    //  消息队列名称
64           //  消息队列的最低长度
30          //  消息队列的最大容量
RT_IPC_FLAG_FIFO   //  队列模式 
);
// 使用 RT_IPC_FLAG_PRIO 优先级 flag 创建的 IPC 对象,在多个线程等待消息队列资源时,将由优先级高的线程优先获得资源

//  使用 RT_IPC_FLAG_FIFO 先进先出 flag 创建的 IPC 对象,在多个线程等待消 息队列资源时,将按照先来先得的顺序获得资源

3、消息队列的删除函数

/**
 * This function will delete a message queue object and release the memory
 *
 * @param mq the message queue object
 *
 * @return the error code
 */
rt_err_t rt_mq_delete(rt_mq_t mq)

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* parameter check */
	  //  检查消息队列是否存在
    RT_ASSERT(mq != RT_NULL);

    /* resume all suspended thread */
	  // 恢复所有因为访问此队列而阻塞的线程,线程得到队列返回的错误代码
    rt_ipc_list_resume_all(&(mq->parent.suspend_thread));

	// 消息队列对象属于应用程序模块 ,此处不使用
#if defined(RT_USING_MODULE) && defined(RT_USING_SLAB)
    /* the mq object belongs to an application module */
    if (mq->parent.parent.flag & RT_OBJECT_FLAG_MODULE)
        rt_module_free(mq->parent.parent.module_id, mq->msg_pool);
    else
#endif

        /* free message queue pool */
	      //  释放消息内存池
        RT_KERNEL_FREE(mq->msg_pool);

    /* delete message queue object */
		// 删除消息队列
    rt_object_delete(&(mq->parent.parent));

    return RT_EOK;

RTM_EXPORT(rt_mq_delete);

3.1、删除消息队列

static rt_mq_t test_mq = RT_NULL; 
rt_err_t del_mq = RT_EOK; 
del_mq = rt_mq_delete(test_mq); 
 if (RT_EOK == del_mq) 
 rt_kprintf("消息队列已删除!\\n\\n");

4、发送消息列表函数

/**
 * This function will send a message to message queue object, if there are
 * threads suspended on message queue object, it will be waked up.\\
     这个函数将向消息队列对象发送消息,如果有的话线程挂起消息队列对象,它将被唤醒
 *
 * @param mq the message queue object
 * @param buffer the message
 * @param size the size of buffer
 *
 * @return the error code
 */
// void *buffer 是即将发送消息的存储地址;rt_size_t size 是即将发送消息的大小
rt_err_t rt_mq_send(rt_mq_t mq, void *buffer, rt_size_t size)

    register rt_ubase_t temp;
    struct rt_mq_message *msg;

	 //  检测传递进来的参数,如果这些参数之中有一个是无效的, 都无法发送消息。
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);

    /* greater than one message size */
	  // 判断消息的大小,其大小不能超过创建时候设置的消息队列 的大小 mq->msg_size,用户可以自定义大小的,如果 mq->msg_size 不够,可 以在创建时候设置大一些。
    if (size > mq->msg_size)
        return -RT_ERROR;

		 //  钩子函数调用
    RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();

		//  获取一个空闲链表指针,必须有一个空闲链表节点用于存放 要发送的消息。如果消息队列已经满了,则无法发送消息。
    /* get a free list, there must be an empty item */
    msg = (struct rt_mq_message *)mq->msg_queue_free;
    /* message queue is full */
    if (msg == RT_NULL)
    
        /* enable interrupt */
			  //  使能中断
        rt_hw_interrupt_enable(temp);

        return -RT_EFULL;
    
    /* move free list pointer */
		//  移动空闲列表指针
    mq->msg_queue_free = msg->next;

    /* enable interrupt */
		// 使能中断
    rt_hw_interrupt_enable(temp);

    /* the msg is the new tailer of list, the next shall be NULL */
		// msg列表下个节点为空
    msg->next = RT_NULL;
    /* copy buffer */
		// 拷贝数据,将即将发送的数据拷贝到空闲链表的节点中,因 为空闲节点有消息头,所以其真正存放消息的地址是 msg + 1。
    rt_memcpy(msg + 1, buffer, size);

    /* disable interrupt */
		//  失能中断
    temp = rt_hw_interrupt_disable();
		
    /* link msg to message queue */
		//  将空闲队列的消息挂载到消息队列尾部,如果此时消息队列 已经有消息,也就是尾部链表不为空,那么就直接将发送的消息挂载到尾部链表 后面。
    if (mq->msg_queue_tail != RT_NULL)
    
        /* if the tail exists, */
        ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;
    

    /* set new tail */
		// 重置消息队列尾链表指针,指向当前发送的消息,无论当前 消息队列中尾链表是否有消息,都需要重置尾链表指针的指向
    mq->msg_queue_tail = msg;
		// 如果连头链表是空的,就需要设置头部链表指针指向当前要 发送的消息,也就是指向消息自身。
    /* if the head is empty, set head */
    if (mq->msg_queue_head == RT_NULL)
        mq->msg_queue_head = msg;

    /* increase message entry */
		// 记录当前消息队列的消息个数,自加 1
    mq->entry ++;

    /* resume suspended thread */
		// 恢复挂起线程。如果当前有线程因为访问队列而进入阻塞, 现在有消息了则可以将该线程从阻塞中恢复
    if (!rt_list_isempty(&mq->parent.suspend_thread))
    
        rt_ipc_list_resume(&(mq->parent.suspend_thread));

        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        rt_schedule();

        return RT_EOK;
    

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    return RT_EOK;

RTM_EXPORT(rt_mq_send);

4.1、发送消息线程

//发送消息线程
/*发送消息时,发送者需指定发送到的消息队列的对象句柄(即指向消息队列 控制块的指针),并且指定发送的消息内容以及消息大小,在发送一个普通消息 之后,空闲消息链表上的消息被转移到了消息队列尾链表上*/
static void send_msg_thread_entry(void* parameter)

	u8 key=0;
	rt_err_t erflag=0;
	u16 msg_value1=1;
	u16 msg_value2=2;
	
	while(1)
	
		key=KEY_Scan(0);
		if(key==KEY_UP_PRESS)
		
			 //  消息队列对象    消息缓冲区    消息缓冲区大小
			erflag=rt_mq_send(test_mq,&msg_value1,sizeof(msg_value1));
			if(erflag!=RT_EOK)  //  发送消息出错
				rt_kprintf("数据不能发送到消息队列!错误代码:0X%lx\\r\\n",erflag);
		
		else if(key==KEY1_PRESS)
		
			erflag=rt_mq_send(test_mq,&msg_value2,sizeof(msg_value2));
			if(erflag!=RT_EOK)
				rt_kprintf("数据不能发送到消息队列!错误代码:0X%lx\\r\\n",erflag);
		
		rt_thread_delay(20);   /* 延时20个tick */
	

4.2、创建发送消息线程

	//创建发送消息线程
	send_msg_thread =rt_thread_create( 
					"send_msg",              /* 线程名字 */
                     send_msg_thread_entry,   /* 线程入口函数 */
                     RT_NULL,             /* 线程入口函数参数 */
                     512,                 /* 线程栈大小 */
                     2,                   /* 线程的优先级 */
                     20);                 /* 线程时间片 */
	
	/* 启动线程,开启调度 */
    if(send_msg_thread != RT_NULL)
		rt_thread_startup(send_msg_thread);
    else
		return -1;

五、接收消息

/**
 * This function will receive a message from message queue object, if there is
 * no message in message queue object, the thread shall wait for a specified
 * time.
 *
 * @param mq the message queue object
 * @param buffer the received message will be saved in
 * @param size the size of buffer
 * @param timeout the waiting time
 *
 * @return the error code
 */
rt_err_t rt_mq_recv(rt_mq_t    mq,     //  消息队列的句柄/消息控制块/消息队列结构体
                    void      *buffer, //  buffer 是用于接收消息的数据存储地址,必须在接收之前就定义,确保地址有效
                    rt_size_t  size,   //  消息大小
                    rt_int32_t timeout) // 超时时间
    
	  //  线程
    struct rt_thread *thread;
	  // 寄存器变量
    register rt_ubase_t temp;
	 // 消息队列
    struct rt_mq_message *msg;
    rt_uint32_t tick_delta;
  
	 //  检查形参是否有效,有效才进行消息队列的数据 读取
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);

    /* initialize delta tick */
	  // 初始化参数
    tick_delta = 0;
    /* get current thread */
	  // 获取当前线程
    thread = rt_thread_self();
	// 调用钩子函数
    RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mq->parent.parent)));

    /* disable interrupt */
		// 失能中断
    temp = rt_hw_interrupt_disable();

    /* for non-blocking call */
		//如果当前消息队列中没有消息并且设置了不等待,则立即返 回错误代码。
    if (mq->entry == 0 && timeout == 0)
    
        rt_hw_interrupt_enable(temp);

        return -RT_ETIMEOUT;
    

    /* message queue is empty */
		// 非阻塞情况
		// 如果消息队列为空,但是用户设置了等待时间,则进入循环 中
    while (mq->entry == 0)
    
        RT_DEBUG_IN_THREAD_CONTEXT;

        /* reset error number in thread */
			 // 重置线程中的错误码
        thread->error = RT_EOK;

        /* no waiting, return timeout */
			 // 不等待
        if (timeout == 0)
        
            /* enable interrupt */
            rt_hw_interrupt_enable(temp);

            thread->error = -RT_ETIMEOUT;

            return -RT_ETIMEOUT;
        

        /* suspend current thread */
				// 挂起当前线程,因为当前线程是由于消息队列为空,并且用 户设置了超时时间,直接将当前线程挂起,进入阻塞状态
        rt_ipc_list_suspend(&(mq->parent.suspend_thread),
                            thread,
                            mq->parent.parent.flag);

        /* has waiting time, start thread timer */
				//用户有设置等待时间,需要启动线程计时器,并且调用 rt_tick_get()函数获取当前系统 systick 时间
        if (timeout > 0)
        
            /* get the start tick of timer */
            tick_delta = rt_tick_get();

            RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\\n",
                                        thread->name));

            /* reset the timeout of thread timer and start it */
					  // 重置线程计时器的超时并启动它,调用 rt_timer_control() 函数改变当前线程阻塞时间,阻塞的时间根据用户自定义的 timeout 设置,并 且调用 rt_timer_start() 函数开始定时
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        

        /* enable interrupt */
				//  使能中断
        rt_hw_interrupt_enable(temp);

        /* re-schedule */
				// 发起一次线程调度。当前线程都已经挂起了,需要进行线程 切换
        rt_schedule();

        /* recv message */
				//  线程错误
        if (thread->error != RT_EOK)
        
            /* return error */
            return thread->error;
        

        /* disable interrupt */
				//  失能中断
        temp = rt_hw_interrupt_disable();

        /* if it's not waiting forever and then re-calculate timeout tick */
				// 如果它不是永远等待,然后重新计算超时节拍
        if (timeout > 0)
        
            tick_delta = rt_tick_get() - tick_delta;
            timeout -= tick_delta;
            if (timeout < 0)
                timeout = 0;
        
    

    /* get message from queue */
		//  获取列表消息  如果当前消息队列中有消息,那么获取消息队列的线程可以 直接从消息队列的 msg_queue_head 链表获取到消息,并不会进入阻塞态中。
    msg = (struct rt_mq_message *)mq->msg_queue_head;

    /* move message queue head */
		//  移动消息队列头链表指针。重置消息队列的 msg_queue_head 指向当前消息的下一个消息。因为当前的消息被取走了,下一个消息才是可获取 的有效消息。
    mq->msg_queue_head = msg->next;
		
    /* reach queue tail, set to NULL */
	   // :如果到达队列尾部,则将消息队列的 msg_queue_tail 设置 为 NULL
    if (mq->msg_queue_tail == msg)
        mq->msg_queue_tail = RT_NULL;

    /* decrease message entry */
		// 记录当前消息队列中消息的个数,entry 减一,消息就是获 取了一个就少一个
    mq->entry --;

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    /* copy message */
		//  拷贝消息到指定存储地址 buffer,拷贝消息的大小为 size, 其大小最大不能超过创建消息队列时候已经定义的消息大小 msg_size
    rt_memcpy(buffer, msg + 1, size > mq->msg_size ? mq->msg_size : size);

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();
    /* put message to free list */
		// 获取一个消息后,消息队列上的头链表消息被转移到空闲消 息链表中,相当于消息的删除操作,这样子可以保证消息队列的循环利用,而不会 导致头链表指针移动到队列尾部时没有可用的消息节点。
    msg->next = (struct rt_mq_message *)mq->msg_queue_free;
    mq->msg_queue_free = msg;
		
    /* enable interrupt */
    rt_hw_interrupt_enable(temp);
 
    // 调用内核对象绑定的钩子函数  函数指针    函数参数
    RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mq->parent.parent)));

    return RT_EOK;

RTM_EXPORT(rt_mq_recv);

5.1、接受消息函数

**
 * IPC flags and control command definitions
 */
#define RT_IPC_FLAG_FIFO                0x00            /**< FIFOed IPC. @ref IPC. */
#define RT_IPC_FLAG_PRIO                0x01            /**< PRIOed IPC. @ref IPC. */

#define RT_IPC_CMD_UNKNOWN              0x00            /**< unknown IPC command */
#define RT_IPC_CMD_RESET                0x01            /**< reset IPC object */

#define RT_WAITING_FOREVER              -1              /**< Block forever until get resource. */
#define RT_WAITING_NO                   0               /**< Non-block. */

/* 定义消息队列控制块 */
static rt_mq_t test_mq = RT_NULL; 


//接收消息线程
static void rec_msg_thread_entry(void* parameter)

	rt_err_t erflag=0;
	u16 rec_msg_value=0;
	
	while(1)
	
		erflag=rt_mq_recv(test_mq,&rec_msg_value,sizeof(rec_msg_value),RT_WAITING_FOREVER);//一直等待
		if(erflag==RT_EOK)
			rt_kprintf("本次接收的数据是:%d\\r\\n",rec_msg_value);
		else
			rt_kprintf("数据接收错误!错误代码:0X%lx\\r\\n",erflag);
		rt_thread_delay(20);   /* 延时20个tick */
	

5.2、创建接收消息线程

	//创建接收消息线程
	rec_msg_thread =rt_thread_create( 
					"rec_msg",              /* 线程名字 */
                     rec_msg_thread_entry,   /* 线程入口函数 */
                     RT_NULL,             /* 线程入口函数参数 */
                     512,                 /* 线程栈大小 */
                     3,                   /* 线程的优先级 */
                     20);                 /* 线程时间片 */
	
	/* 启动线程,开启调度 */
    if(rec_msg_thread != RT_NULL)
		rt_thread_startup(rec_msg_thread);
    else
		return -1;

以上是关于rt_thread的消息队列的主要内容,如果未能解决你的问题,请参考以下文章

rt_thread线程间通讯

Kafka笔记整理

Python中queue消息队列模块

如何使用Redis 做队列操作

循环队列的实现

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?