[RTT] RT-Thread线程调度机制线程切换时机
Posted BRRRRRRRRR_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[RTT] RT-Thread线程调度机制线程切换时机相关的知识,希望对你有一定的参考价值。
1. 问题
最初接触RTT时,对于线程切换时机的相关概念主要来自以下几个方面:
- RTT在创建线程时,需要输入线程的时间片参数,时间片的单位为OS Tick。
- 线程休眠函数rt_thread_delay()、设置软件定时器、以及一些如信号量、邮箱等可以设置timeout的线程间通讯和同步方式,设置的时间参数都是以OS Tick为最小单位。
- RTT文档也写出:操作系统中最小的时间单位是时钟节拍 (OS Tick)。
由此,我便将其理解为:OS Tick也是系统调度的最小单位,每一个OS Tick,操作系统就会执行一次调度器,判断各线程状态和优先级,从而执行线程调度。
但如果该说法成立,似乎又解释不通以下几个问题(以下假设tick为1ms时):
-
线程1在执行200us后就将自己挂起,如果到下一个tick才会运行调度器、运行线程2,明显是对CPU资源的极大浪费,同时期间的800ms时间是被谁占用?切换到空闲线程也是需要调度器来执行的。
-
. 如果软件定时器设置为1ms,软件定时器优先级以下的线程都无法执行?
2. 验证
针对线程调度和OS Tick的理解与以上几个问题的冲突,决定先通过代码测试下我的想法是否正确。
测试代码:
static int thread_run_count = 0;
static int start_tick = 0;
static struct rt_completion tick_test_sema1;
static struct rt_completion tick_test_sema2;
void tick_test_thread1(void* arg)
{
if(thread_run_count == 0)
{
start_tick = rt_tick_get();//记录启动时间
}
while(1)
{
rt_completion_done(&tick_test_sema1);//释放sema1
rt_completion_wait(&tick_test_sema2, RT_WAITING_FOREVER);//等待sema2
if(thread_run_count == 100000)//两个线程切换了10000次
{
rt_kprintf("-----tick count: %d------\\n", rt_tick_get() - start_tick);//输出测试耗时
return;
}
thread_run_count++;
}
}
//测试线程2,信号量1、2互换,其余相同
void tick_test_thread2(void* arg)
{
if(thread_run_count == 0)
{
start_tick = rt_tick_get();
}
while(1)
{
rt_completion_done(&tick_test_sema2);
rt_completion_wait(&tick_test_sema1, RT_WAITING_FOREVER);
if(thread_run_count == 100000)
{
rt_kprintf("-----tick count: %d------\\n", rt_tick_get() - start_tick);
return;
}
thread_run_count++;
}
}
void tick_test(void)
{
rt_completion_init(&tick_test_sema1);
rt_completion_init(&tick_test_sema2);
rt_thread_t test_thread = rt_thread_create("test1",tick_test_thread1, NULL,
1024, 15, 20);
if (test_thread != RT_NULL)
{
rt_thread_startup(test_thread);
}
test_thread = rt_thread_create("test2",tick_test_thread2, NULL,
1024, 15, 20);
if (test_thread != RT_NULL)
{
rt_thread_startup(test_thread);
}
}
两个相同优先级的线程,线程1释放信号量1,之后挂起等待信号量2,线程2释放信号量2,之后挂起等待信号量1。
两个线程交替执行共100000次,输出执行耗时。
如果之前的想法成立,系统仅在每个tick时,才会执行调度器,那10W次的线程切换应至少消耗10W个tick,10W ms,而实际测试耗时为258个tick,258ms。
之后将OS Tick,由1ms改为10ms:
/* Tick per Second */
#define RT_TICK_PER_SECOND 100
相同测试内容,耗时为27个tick,270ms。
很明显,两个线程频繁切换的耗时几乎是固定的,与OS Tick无关,那么之前的想法:OS Tick是系统调度的最小单位很明显就是错误的。
3. 代码分析
- 每个OS Tick中断,会执行一次调度器,这个是之前就分析到的。
void rt_tick_increase(void)
{
struct rt_thread *thread;
/* increase the global tick */
#ifdef RT_USING_SMP
rt_cpu_self()->tick ++;
#else
++ rt_tick;
#endif
/* check time slice */
thread = rt_thread_self();
-- thread->remaining_tick;
if (thread->remaining_tick == 0)
{
/* change to initialized tick */
thread->remaining_tick = thread->init_tick;
thread->stat |= RT_THREAD_STAT_YIELD;
/* yield */
rt_thread_yield();//------里面执行调度器
}
/* check timer */
rt_timer_check();
}
rt_err_t rt_thread_yield(void)
{
rt_schedule();//-----这个是调度器
return RT_EOK;
}
- rt_thread_delay()实际调用的是函数rt_thread_sleep(),进入该函数后查看,在对线程挂起、超时定时器进行设置并运行后,执行了一次调度器:
rt_err_t rt_thread_sleep(rt_tick_t tick)
{
register rt_base_t temp;
struct rt_thread *thread;
/* set to current thread */
thread = rt_thread_self();
RT_ASSERT(thread != RT_NULL);
RT_ASSERT(rt_object_get_type((rt_object_t)thread) == RT_Object_Class_Thread);
/* disable interrupt */
temp = rt_hw_interrupt_disable();
/* suspend thread */
rt_thread_suspend(thread);
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer), RT_TIMER_CTRL_SET_TIME, &tick);
rt_timer_start(&(thread->thread_timer));
/* enable interrupt */
rt_hw_interrupt_enable(temp);
rt_schedule();//-------一样执行了调度器,在这
/* clear error number of this thread to RT_EOK */
if (thread->error == -RT_ETIMEOUT)
thread->error = RT_EOK;
return RT_EOK;
}
因此所有通过delay函数挂起的线程,在挂起后都会立刻执行一次调度器,进行线程切换,无需等到下个OS Tick到来。
- 其他会将该线程挂起的操作:等待信号量、互斥锁,从邮箱、消息队列中阻塞接收数据时也基本相同:将当前线程挂起后执行调度器。以信号量为例:
rt_err_t rt_sem_take(rt_sem_t sem, rt_int32_t time)
{
register rt_base_t temp;
struct rt_thread *thread;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(sem->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s take sem:%s, which value is: %d\\n",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));
if (sem->value > 0)
{
/* semaphore is available */
sem->value --;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
}
else
{
/* no waiting, return with timeout */
if (time == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
else
{
/* current context checking */
RT_DEBUG_IN_THREAD_CONTEXT;
/* semaphore is unavailable, push to suspend list */
/* get current thread */
thread = rt_thread_self();
/* reset thread error number */
thread->error = RT_EOK;
RT_DEBUG_LOG(RT_DEBUG_IPC, ("sem take: suspend thread - %s\\n",
thread->name));
/* suspend thread */
rt_ipc_list_suspend(&(sem->parent.suspend_thread),
thread,
sem->parent.parent.flag);
/* has waiting time, start thread timer */
if (time > 0)
{
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\\n",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&time);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* do schedule */
rt_schedule();//------- timeout内没有获取到信号里昂,线程被挂起,之后执行调度器执行其他线程
if (thread->error != RT_EOK)
{
return thread->error;
}
}
}
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(sem->parent.parent)));
return RT_EOK;
}
- 同时,可能会将其他线程唤醒的操作:通过邮箱发送数据、释放信号量、释放互斥锁等,也会执行一次调度器,以确保此时若有更高优先级的线程因等待该资源而挂起时,可以挂起当前线程来执行高优先级线程。
rt_err_t rt_sem_release(rt_sem_t sem)
{
register rt_base_t temp;
register rt_bool_t need_schedule;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(sem->parent.parent)));
need_schedule = RT_FALSE;
/* disable interrupt */
temp = rt_hw_interrupt_disable();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s releases sem:%s, which value is: %d\\n",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));
if (!rt_list_isempty(&sem->parent.suspend_thread))
{
/* resume the suspended thread */
rt_ipc_list_resume(&(sem->parent.suspend_thread));
need_schedule = RT_TRUE;
}
else
sem->value ++; /* increase value */
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* resume a thread, re-schedule */
if (need_schedule == RT_TRUE)
rt_schedule();//--------信号量释放完以后判断需要调度,在这执行调度器
return RT_EOK;
}
3. 总结
- RTT文档中:操作系统中最小的时间单位是时钟节拍 (OS Tick)的说法并无错误,因为我们可以执行的以时间为衡量的操作,都是以OS Tick为最小单位的。同时如果没有线程主动挂起的情况,仅以时间片为依照来调度,也是以OS Tick为最小单位的。
- 但如果存在线程主动挂起的情况,挂起时间无法确定,此时仅以OS Tick为单位调度,会造成CPU资源的极大浪费,同时降低系统的实时性。因此RTT在所有可以挂起线程的操作后,都添加了主动执行一次调度器的操作,直接执行当前已就绪的最高优先级的线程。
- OS Tick是系统调度的最小单位的理解更正为:系统调度会发生在OS Tick中断和当前线程主动挂起以及高优先级线程获取到资源时。
PS:RTT的内核封装的较好,代码易读性高,之前也经常翻内核代码发现并不复杂,也比较易懂。如果大家在开发过程中有对RTT内核机制、逻辑的疑惑,建议先自己动手翻下底层代码,或许可以更快的解决问题。
以上是关于[RTT] RT-Thread线程调度机制线程切换时机的主要内容,如果未能解决你的问题,请参考以下文章