Raw-OS源代码分析之消息系统-Queue_Buffer
Posted mthoutai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Raw-OS源代码分析之消息系统-Queue_Buffer相关的知识,希望对你有一定的参考价值。
分析的内核版本号截止到2014-04-15,基于1.05正式版。blogs会及时跟进最新版本号的内核开发进度,若源代码凝视出现”???”字样,则是未深究理解部分。
Raw-OS官方站点:http://www.raw-os.org/
Raw-OS托管地址:https://github.com/jorya/raw-os/
有了之前的queue和queue_size的解读之后。这个queue_buffer就很之简单了~对于queue是转发消息的指针,queue_size则是转发消息指针和消息大小。那么queue_buffer不在转存消息的指针,而是将消息copy一份,转发消息内容和消息大小。
这个定义一个queue_buffer消息的概念~
一个queue_buffer消息包含消息的大小,消息详细数据内容,然后依照上图的结构进行排列~那么在queue_buffer中,详细消息是封装成以上的结构进行转发的~
那么在queue_buffer建立的内部数据存储仓库,queue_buffer的消息按这样排列的
当中,head指针指向queue_buffer消息存在的開始位置。tail指针指向queue_buffer消息存在的末尾位置,剩余的没转存消息的位置称为空暇消息
还有注意一点的就是。queue_buffer的queue_buffer消息一定要按4字节对齐的形式存储在queue_buffer的存储仓中,原因的话要详细看代码才干说明确
Queue_buffer消息的转存过程和queue和queue_size一样,仅仅是内核多了复制详细数据这一部的操作,至于在转存仓库的组织形式,都是按各自的消息结构体来组织。queue消息是指向消息的void指针,queue_size就是以消息指针和消息大小组成的queue_size结构体,那么到这里queue_buffer就是以消息长度+消息实际内容的形式。
代码仅仅说明queue_buffer建立。发送queue_buffer消息到queue_buffer的部分
1.queue_buffer的创建
RAW_U16 raw_queue_buffer_create(RAW_QUEUE_BUFFER *q_b, RAW_U8 *p_name, RAW_VOID *msg_buffer, MSG_SIZE_TYPE buffer_size, MSG_SIZE_TYPE max_msg_size) { MSG_SIZE_TYPE bufsz; RAW_U8 queue_buffer_align_mask; /* 检查创建消息buffer的边界条件 */ #if (RAW_QUEUE_BUFFER_FUNCTION_CHECK > 0) if (q_b == 0) { return RAW_NULL_OBJECT; } if (msg_buffer == 0) { return RAW_NULL_POINTER; } #endif /* 消息buffer的大小必须满足4字节的整数倍 */ bufsz = ROUND_BUFFER_SIZE(buffer_size); /* 消息buffer大小为0时,错误返回 */ if (bufsz == 0) { return RAW_QUEUE_BUFFER_SIZE_0; } /* 消息buffer大小不满足4字节对齐时,错误返回 */ if (bufsz != buffer_size) { return RAW_QUEUE_BUFFER_INVALID_SIZE; } queue_buffer_align_mask = HEADERSZ - 1u; /* 消息buffer的内部存储消息的仓库的地址相同也要满足4字节的整数倍对齐 */ if (((RAW_U32)msg_buffer & queue_buffer_align_mask)){ return RAW_INVALID_ALIGN; } /* 初始化消息buffer的堵塞链表头 */ list_init(&q_b->common_block_obj.block_list); q_b->bufsz = bufsz; q_b->frbufsz = bufsz; q_b->buffer = msg_buffer; q_b->maxmsz = max_msg_size; q_b->head = 0; q_b->tail = 0; q_b->common_block_obj.name = p_name; q_b->common_block_obj.block_way = RAW_BLOCKED_WAY_PRIO; q_b->common_block_obj.object_type = RAW_QUEUE_BUFFER_OBJ_TYPE; TRACE_QUEUE_BUFFER_CREATE(raw_task_active, q_b); return RAW_SUCCESS; }
2.发送queue_buffer消息
RAW_U16 raw_queue_buffer_end_post(RAW_QUEUE_BUFFER *q_b, RAW_VOID *p_void, MSG_SIZE_TYPE msg_size) { /* 发送queue buffer消息时的边界条件检查 */ #if (RAW_QUEUE_BUFFER_FUNCTION_CHECK > 0) if (q_b == 0) { return RAW_NULL_OBJECT; } if (p_void == 0) { return RAW_NULL_POINTER; } /* 发送queue buffer消息长度不能超过创建queue_buffer定义的最大消息长度 */ if (msg_size > q_b->maxmsz) { return RAW_EXCEED_QUEUE_BUFFER_MSG_SIZE; } #endif /* 开启0中断特性后。不能在中断ISR中发送queue_buffer,而queue和queue_size能够,想想为什么??? */ #if (CONFIG_RAW_ZERO_INTERRUPT > 0) if (raw_int_nesting) { return RAW_NOT_CALLED_BY_ISR; } #endif return queue_buffer_post(q_b, p_void, msg_size, SEND_TO_END); }
存储queue_buffer的核心代码
RAW_U16 queue_buffer_post(RAW_QUEUE_BUFFER *q_b, RAW_VOID *p_void, MSG_SIZE_TYPE msg_size, RAW_U8 opt_send_method) { LIST *block_list_head; RAW_TASK_OBJ *task_ptr; RAW_SR_ALLOC(); RAW_CRITICAL_ENTER(); if (q_b->common_block_obj.object_type != RAW_QUEUE_BUFFER_OBJ_TYPE) { RAW_CRITICAL_EXIT(); return RAW_ERROR_OBJECT_TYPE; } block_list_head = &q_b->common_block_obj.block_list; if (!is_queue_buffer_free(q_b, msg_size)) { RAW_CRITICAL_EXIT(); TRACE_QUEUE_BUFFER_MAX(raw_task_active, q_b, p_void, msg_size, opt_send_method); return RAW_QUEUE_BUFFER_FULL; } /* 当queue_buffer内部存储仓未满时,就会将消息转存到存储仓里面。并且内部仅仅实现发送存储仓末尾的操作 */ if (is_list_empty(block_list_head)) { if (opt_send_method == SEND_TO_END) { msg_to_end_buffer(q_b, p_void, msg_size); } else { /* 没有定义有除发送到存储仓末尾外的操作 */ } RAW_CRITICAL_EXIT(); TRACE_QUEUE_BUFFER_POST(raw_task_active, q_b, p_void, msg_size, opt_send_method); return RAW_SUCCESS; } /* * 假设有任务堵塞在queue_buffer上等待消息,个人觉得以下这些操作是为了加快消息传递, * 不由queue_buffer的内部存储仓转存数据内容。少了一个二次copy的过程 * * 找到queue_buffer的堵塞链表的最高优先级任务,然后直接将消息内容和大小copy到任务控制块中,然后唤醒该任务 */ task_ptr = list_entry(block_list_head->next, RAW_TASK_OBJ, task_list); raw_memcpy(task_ptr->msg, p_void, msg_size); task_ptr->qb_msg_size = msg_size; raw_wake_object(task_ptr); RAW_CRITICAL_EXIT(); TRACE_QUEUE_BUFFER_WAKE_TASK(raw_task_active, list_entry(block_list_head->next, RAW_TASK_OBJ, task_list), p_void, msg_size, opt_send_method); raw_sched(); return RAW_SUCCESS; }
以上是关于Raw-OS源代码分析之消息系统-Queue_Buffer的主要内容,如果未能解决你的问题,请参考以下文章
Memcached源代码分析 - Memcached源代码分析之消息回应
高吞吐量的分布式发布订阅消息系统Kafka之Producer源码分析