等待队列源码分析

Posted Linux内核之旅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了等待队列源码分析相关的知识,希望对你有一定的参考价值。

正如list_head结构那样,等待队列(wait queue)作为linux内核中的基础数据结构,与进程调度紧密结合在一起;在驱动程序中,常常使用等待队列来实现进程的阻塞和进程的唤醒。因此,我们很有必要对它的内部实现进行分析。

0.数据结构

一般我们的链式线性表都会有一个头结点,以使我们迅速找到这个线性链表的“领导”。在等待队列中,同样有队列头,只不过等待队列头和普通的等待队列结点定义有所不同。

1 //yoursource/include/linux/wait.h
2 struct __wait_queue_head {
3          spinlock_t lock;
4          struct list_head task_list;
5  };
6  typedef struct __wait_queue_head wait_queue_head_t;

可以看到,等待队列头结构中封装了list_head结构。这么做是可以想象到的,因为队列和栈本质上还是双联表。当我们适当限制双联表的某些操作时,就可以实现这样的功能。另外,等待队列头结构中还有一个自旋锁结构的变量lock,它起到了对等待队列进行互斥操作的作用。

在等待队列结构中,除了使用list_head结构外,还有以下几个字段:

1 typedef struct __wait_queue wait_queue_t;
2 struct __wait_queue {
3         unsigned int flags;
4 #define WQ_FLAG_EXCLUSIVE       0x01
5         void *private;
6         wait_queue_func_t func;
7         struct list_head task_list;
8 };

flag:指明该等待的进程是互斥还是非互斥,为0时非互斥,为1时互斥;
WQ_FLAG_EXCLUSIVE :从变量可以看出,此宏代表进程是互斥的;
private:void型指针变量功能强大,你可以赋给它你需要的结构体指针。一般赋值为task_struct类型的指针,也就是说指向一个进程;
func:函数指针,指向该等待队列项的唤醒函数;

我们可以通过下述图来详细了解等待队列头和等待队列的结构关系:

1.定义及初始化

内核中使用init_waitqueue_head宏来初始化等待队列头。

1 //yourLinuxSourceDir/include/linux/wait.h
2   #define init_waitqueue_head(q)                          \
3           do {                                            \
4                   static struct lock_class_key __key;     \
5                                                           \
6                   __init_waitqueue_head((q), &__key);     \
7           } while (0)

我们可以看到这个宏使用了do-while型循环语句,里面包含两条语句。首先定义了一个变量__key,然后再调用init_waitqueue_head函数。

事实上,这个do-while循环语句只会执行一次。那么,为什么要选择使用这个循环语句?在定义宏的时候将上述语句嵌套在一个大括号里也可以啊!可能我们如下那样使用一个宏:

1          if(conditon)
2                  init_waitqueue_head(q);
3          else
4                  do_somthing_else();

如果我们去除do-while,那么替换后会编译错误。因为宏末尾的分号使得else语句成为一个单独的句子。你也许会说,那我这样使用:init_waitqueue_head(q)就可以避免这个错误了。这样是可以,但是对于那些初学者来说,很难避免他们要加上一个;。并且,在茫茫代码中,孤零零的出现一个没有;的语句,未免显得有些奇怪。

言归正传,在init_waitqueue_head宏中调用的__init_waitqueue_head函数定义如下:

1 void __init_waitqueue_head(wait_queue_head_t *q, struct lock_class_key *key)
2  {
3          spin_lock_init(&q->lock);
4          lockdep_set_class(&q->lock, key);
5          INIT_LIST_HEAD(&q->task_list);
6  }

在这个函数中,首先利用自旋锁初始化函数初始化这个自旋锁;在上述等待队列头的定义中,我们可以看到task_list字段是一个list_head结构类型,因此我们使用INIT_LIST_HEAD对这个链表进行初始化。这些过程都是我们所熟悉的。

同时,我们也可以使用初始化宏DECLARE_WAIT_QUEUE_HEAD一步的进行定义和初始化。

1 #define __WAIT_QUEUE_HEAD_INITIALIZER(name) {                           \
2         .lock           = __SPIN_LOCK_UNLOCKED(name.lock),              \
3         .task_list      = { &(name).task_list, &(name).task_list } }
4
5 #define DECLARE_WAIT_QUEUE_HEAD(name) \
6         wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

上述代码对于有C基础的人理解起来并不困难。需要注意的是对task_list进行赋值后,这个结点的前后指针都会指向自己。

同样,对于等待队列,我们可以使用DECLARE_WAITQUEUE宏来定义并初始化一个等待队列项。

1 int default_wake_function(wait_queue_t *wait, unsigned mode, int flags, void *key);
2 #define __WAITQUEUE_INITIALIZER(name, tsk) {                            \
3         .private        = tsk,                                          \
4         .func           = default_wake_function,                        \
5         .task_list      = { NULL, NULL } }
6
7 #define DECLARE_WAITQUEUE(name, tsk)                                    \
8         wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)

name要定义的等待队列项的名称;tsk是task_struct类型的指针变量,它指向这个等待队列项所对应的进程。

2.添加/移除等待队列

add_wait_queue添加函数将等待队列wait添加到以q为等待队列头的那个等待队列链表中。

1 void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
2 {
3         unsigned long flags;
4
5         wait->flags &= ~WQ_FLAG_EXCLUSIVE;
6         spin_lock_irqsave(&q->lock, flags);
7         __add_wait_queue(q, wait);
8         spin_unlock_irqrestore(&q->lock, flags);
9 }

我们可以看到flags的结果必然是0,也就是说这个函数是将非互斥进程添加到等待队列当中。而且在调用具体的添加函数时候,使用关中断并保护现场的自旋锁方式使得添加操作每次只被一个进程访问。

具体的添加过程是将当前进程所对应的等待队列结点wait添加到等待队列头结点q之后。具体来说,就是将new->task_list结点添加到以head->task_list为头指针的双链表当中。另外,通过add_wait_queue_exclusive函数可以将一个互斥进程添加到等待队列当中。从添加过程可以发现,add_wait_queue函数会将非互斥进程添加到等待队列的前部。

1 static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
2 {
3         list_add(&new->task_list, &head->task_list);
4 }

另外, add_wait_queue_exclusive添加函数则会将互斥进程添加到等待队列的末尾。

1 void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait)
2 {
3         unsigned long flags;
4
5         wait->flags |= WQ_FLAG_EXCLUSIVE;
6         spin_lock_irqsave(&q->lock, flags);
7         __add_wait_queue_tail(q, wait);
8         spin_unlock_irqrestore(&q->lock, flags);
9 }

remove_wait_queue函数用于将等待队列项wait从以q为等待队列头的等待队列中移除,源码如下。

01 void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
02  {
03          unsigned long flags;
04  
05          spin_lock_irqsave(&q->lock, flags);
06          __remove_wait_queue(q, wait);
07          spin_unlock_irqrestore(&q->lock, flags);
08  }
09 static inline void __remove_wait_queue(wait_queue_head_t *head,
10                                                         wait_queue_t *old)
11 {
12         list_del(&old->task_list);
13 }

有了上述的基础,那么移除函数就简单了许多。

3.在等待队列上睡眠

如何实现进程的阻塞?大致过程就是将当前进程的状态设置成睡眠状态,然后将这个进程加入到等待队列即可。在linux内核中有一组函数接口来实现这个功能。

01 void __sched interruptible_sleep_on(wait_queue_head_t *q)
02 {
03         sleep_on_common(q, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
04 }
05 EXPORT_SYMBOL(interruptible_sleep_on);
06
07 long __sched
08 interruptible_sleep_on_timeout(wait_queue_head_t *q, long timeout)
09 {
10         return sleep_on_common(q, TASK_INTERRUPTIBLE, timeout);
11 }
12 EXPORT_SYMBOL(interruptible_sleep_on_timeout);
13
14 void __sched sleep_on(wait_queue_head_t *q)
15 {
16         sleep_on_common(q, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
17 }
18 EXPORT_SYMBOL(sleep_on);
19
20 long __sched sleep_on_timeout(wait_queue_head_t *q, long timeout)
21 {
22         return sleep_on_common(q, TASK_UNINTERRUPTIBLE, timeout);
23 }
24 EXPORT_SYMBOL(sleep_on_timeout);

通过上述源码,你可以发现这些函数在内部都调用了sleep_on_common函数,通过传递不同的值来实现不同的功能。而这个通用函数的三个参数分别关注的是:进程要加入到那个等待队列?进程是那种睡眠状态(TASK_UNINTERRUPTIBLE还是TASK_INTERRUPTIBLE)?进程睡眠的时间?

01 static long __sched
02 sleep_on_common(wait_queue_head_t *q, int state, long timeout)
03 {
04         unsigned long flags;
05         wait_queue_t wait;
06
07         init_waitqueue_entry(&wait, current);
08
09         __set_current_state(state);
10
11         spin_lock_irqsave(&q->lock, flags);
12         __add_wait_queue(q, &wait);
13         spin_unlock(&q->lock);
14         timeout = schedule_timeout(timeout);
15         spin_lock_irq(&q->lock);
16         __remove_wait_queue(q, &wait);
17         spin_unlock_irqrestore(&q->lock, flags);
18
19         return timeout;
20 }

在此函数中,首先定义了一个等待队列项结点,通过 init_waitqueue_entry函数对其进行初始化。可以从下述初始化源码中看到,此时该等待队列项指向当前当前进程。而唤醒函数指针func则指向内核自定义的一个默认唤醒函数default_wake_function。

1 static inline void init_waitqueue_entry(wait_queue_t *q, structtask_struct *p)
2  {
3         q->flags = 0;
4         q->private = p;
5         q->func = default_wake_function;
6  }

初始化完毕后,通过__set_current_state函数将当前进程的状态设置成state。接着,在自旋锁的保护下,将当前进程对应的等待队列结点插入到等待队列链表当中。更重要的是,在schedule_timeout函数中不仅要设置进程的睡眠时间(以jiffies为单位的),还要使用schedule函数进行重新调度。一旦使用了schedule函数后,也就意味这当前这个进程真正的睡眠了,那么接下来的代码会在它唤醒后执行。当该进程被唤醒后(资源可用时),会从等待队列中将自己对应的那个等待队列结点wait移除。

上述过程都是在自旋锁保护下进行的,并且在整个执行过程中不可被其他中断所打断。现在再回过头去看一开始的那四个睡眠函数接口,你就明白了它们各自的不同之处了。

4.唤醒函数

唤醒函数会唤醒以x为头结点的等待队列中的等待队列项所对应的进程。与睡眠函数类似,内核中也有一组函数可以对阻塞的进程进行唤醒。

1 #define wake_up(x)                      __wake_up(x, TASK_NORMAL, 1, NULL)
2 #define wake_up_nr(x, nr)               __wake_up(x, TASK_NORMAL, nr, NULL)
3 #define wake_up_all(x)                  __wake_up(x, TASK_NORMAL, 0, NULL)
4 #define wake_up_interruptible(x)        __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)


5 #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL)


6 #define wake_up_interruptible_all(x)    __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)


通过上述代码,我们可以发现这些唤醒函数均调用了__wake_up函数。__wake_up函数的四个参数分别指:头结点指针、唤醒进程的类型、唤醒进程的数量和一个附加的void型指针变量。

1 void __wake_up(wait_queue_head_t *q, unsigned int mode,
2                         int nr_exclusive, void *key)
3 {
4         unsigned long flags;
5
6         spin_lock_irqsave(&q->lock, flags);
7         __wake_up_common(q, mode, nr_exclusive, 0, key);
8         spin_unlock_irqrestore(&q->lock, flags);
9 }

在__wake_up函数又通过传递不同的参数调用__wake_up_common函数来实现不同的唤醒功能。

01 static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
02                         int nr_exclusive, int wake_flags, void *key)
03 {
04         wait_queue_t *curr, *next;
05
06         list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
07                 unsigned flags = curr->flags;
08
09                 if (curr->func(curr, mode, wake_flags, key) &&
10                                 (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
11                         break;
12         }
13 }

list_for_each_entry_safe函数将遍历整个等待队列中的链表,通过每次的逻辑判断来唤醒相应的进程。这个if语句初看起来有点麻烦,不过我们一点一点的将它拆解。

curr->func(curr, mode, sync, key):即执行默认的唤醒函数,将指定的进程curr以mode方式唤醒。成功唤醒返回1;否则,返回0;
(flags & WQ_FLAG_EXCLUSIVE):判断当前进程是否以互斥形式唤醒。是互斥形式则返回1;否则返回0;
!–nr_exclusive:nr_exclusive为需要唤醒的互斥进程的数量。

这三个部分是通过逻辑与连接起来的。根据逻辑与的运算规则,只有当第一部分为真时才会判断第二部分的值,依次再判断第三部分的值。

通过上述的等待队列的添加过程我们知道,等待队列中前面均是非互斥进程,后面才是互斥进程。因此,唤醒函数总先唤醒全部的非互斥进程。因为当__wake_up_commom函数每一次去判断if语时,总会“不自觉”去执行默认的唤醒函数(除非唤醒失败,那么会退出遍历宏);当全部的非互斥进程被唤醒后,第二个判断条件也就成立了。因此__wake_up_commom函数会依次唤醒nr_exclusive个互斥进程;当–nr_exclusive为0时(!–nr_exclusive也就为真),整个遍历过程也恰好结束,而此时if语句的三个判断条件才刚好满足(这段代码太强大了!!!)。

5.有条件的睡眠

与睡眠函数不同,条件睡眠是指当某些条件发生时,这个进程才会加入到等待队列当中。关于条件睡眠有下列的宏:

1 wait_event(wq, condition)
2 wait_event_timeout(wq, condition, timeout)
3 wait_event_interruptible(wq, condition)
4 wait_event_interruptible_timeout(wq, condition, timeout)

关于条件睡眠,虽然函数实现与睡眠函数不同,但是基本思想是相似的,各位可以查找相应的源码进行分析。Try!


以上是关于等待队列源码分析的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantLock源码分析

ReentrantLock源码分析

深入浅出Java并发编程指南「源码分析篇」透析ThreadPoolExecutor线程池运作机制和源码体系

ThreadX内核源码分析 - 消息队列

ThreadX内核源码分析 - 消息队列

浅谈AQS同步队列(含ReentrantLock加锁和解锁源码分析)