Linux中的spinlock机制 - CAS和ticket spinlock
Posted 宋宝华
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux中的spinlock机制 - CAS和ticket spinlock相关的知识,希望对你有一定的参考价值。
为什么要加锁
在SMP系统中,如果仅仅是需要串行地增加一个变量的值,那么使用原子操作的函数(API)就可以了。但现实中更多的场景并不会那么简单,比如需要将一个结构体A中的数据提取出来,然后格式化、解析,再添加到另一个结构体B中,这整个的过程都要求是「原子的」,也就是完成之前,不允许其他的代码来读/写这两个结构体中的任何一个。
这时,相对轻量级的原子操作API就无法满足这种应用场景的需求了,我们需要一种更强的同步/互斥机制,那就是软件层面的「锁」的机制。
同步锁的「加锁」和「解锁」是放在一段代码的一前一后,成对出现的,这段代码被称为Critical Section/Region(临界区)。但锁保护的并不是这段代码本身,而是其中使用到的多核/多线程共享的变量,它「同步」(或者说串行化)的是对这个变量的访问,通俗的语义就是“我有你就不能有,你有我就不会有”。
Linux中主要有两种同步锁,一种是spinlock,一种是mutex。spinlock和mutex都既可以在用户进程中使用,也可以在内核中使用,它们的主要区别是前者不会导致睡眠和调度,属于busy wait形式的锁,而后者可能导致睡眠和调度,属于sleep wait形式的锁。
spinlock是最基础的一种锁,像后面将要介绍的rwlock(读写锁),seqlock(读写锁)等都是基于spinlock衍生出来的。就算是mutex,它的实现与spinlock也是密不可分。因此,本系列文章将首先围绕spinlock展开介绍。
如何加锁
Linux中spinlock机制发展到现在,其实现方式的大致有3种。
【第一种实现 - 经典的CAS】
最古老的一种做法是:spinlock用一个整形变量表示,其初始值为1,表示available的状态。当一个CPU(设为CPU A)获得spinlock后,会将该变量的值设为0,之后其他CPU试图获取这个spinlock时,会一直等待,直到CPU A释放spinlock,并将该变量的值设为1。
那么其他的CPU是以何种形式等待的,如果有多个CPU一起等待,形成了竞争又该如何处理?这里要用到经典的CAS操作(Compare And Swap)。
谁和谁比较
目前,sh架构的Linux实现中还保留有这种经典的实现方法(相关代码位于/arch/sh/include/asm/spinlock-cas.h)。
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
while (!__sl_cas(&lock->lock, 1, 0));
}
static inline unsigned __sl_cas(volatile unsigned *p, unsigned old, unsigned new)
{
__asm__ __volatile__("cas.l %1,%0,@r0"
: "+r"(new)
: "r"(old), "z"(p)
: "t", "memory" );
return new;
}
其中,"p"指向spinlock变量所在的内存位置,存储的是当前spinlock实际的值,"old"存储的试图获取spinlock的本地CPU希望的值(1)。
不断地把「期望的值」和「实际的值」进行比较(compare),当它们相等时,说明持有spinlock的CPU已经释放了锁,那么试图获取spinlock的CPU就会尝试将"new"的值(0)写入"p"(swap),以表明自己成为spinlock新的owner。
汇编代码看起来可能略费力一些,用一段伪代码来展示或许会更加地直观:
function cas(p, old, new)
{
if *p ≠ old
do nothing
else
*p ← new
}
这里只用了0和1两个值来表示spinlock的状态,没有充分利用spinlock作为整形变量的属性,为此还有一种衍生的方法,可以判断当前spinlock的争用情况。具体规则是:每个CPU在试图获取一个spinlock时,都会将这个spinlock的值减1,所以这个值可以是负数,而「负」的越多(负数的绝对值越大),说明当前的争抢越激烈。
存在的问题
基于CAS的实现速度很快,尤其是在没有真正竞态的情况下(事实上大部分时候就是这种情况), 但这种方法存在一个缺点:它是「不公平」的。一旦spinlock被释放,第一个能够成功执行CAS操作的CPU将成为新的owner,没有办法确保在该spinlock上等待时间最长的那个CPU优先获得锁,这将带来延迟不能确定的问题。
【第二种实现 - Ticket Spinlock】
为了解决这种「无序竞争」带来的不公平问题,spinlock的另一种实现方法是采用排队形式的"ticket spinlock"。这里,我想展示ticket spinlock的两个实现版本,它们的原理都是一样的,只是具体细节略有差异。
ACRN版本
先来看下基于x86-64的ACRN hypervisor对于ticket spinlock的实现:
表示一个spinlock的数据结构由"head"和"tail"两个队列的索引组成。
typedef struct _spinlock {
uint32_t head;
uint32_t tail;
} spinlock_t;
"head"指向当前队列的头部,"tail"指向当前队列的尾部,其初始值都为0。
解锁
一个spinlock被owner释放时,该spinlock的head值会被owner通过"inc"指令加1。
static inline void spinlock_release(spinlock_t *lock)
{
asm volatile (" lock incl %[head]\\n" // head加1
:
: [head] "m" (lock->head)
: "cc", "memory");
}
加锁
其他CPU在试图获取这个spinlock时,会通过"xadd"指令将"tail"值保存在自己的eax寄存器中,然后将该spinlock的"tail"值加1(也就是将自己加到了这个等待队列的尾部)。
static inline void spinlock_obtain(spinlock_t *lock)
{
asm volatile (" movl $0x1,%%eax\\n" // eax = 1
" lock xaddl %%eax,%[tail]\\n" // eax = old tail, new tail = old tail + 1
" cmpl %%eax,%[head]\\n" // 比较eax(old tail)和head
" jz 1f\\n" // 相等,获得锁
"2: pause\\n" // 不相等,继续比较
" cmpl %%eax,%[head]\\n"
" jnz 2b\\n"
"1:\\n"
:
:
[head] "m"(lock->head),
[tail] "m"(lock->tail)
: "cc", "memory", "eax");
}
接下来就是不断的循环比较,判断该spinlock当前的"head"值,是否和自己存储在eax寄存器中的"tail"值相等,相等时则获得该spinlock,成为新的owner。
这类似于你去银行柜台办理业务,假设当前银行只有一个柜台,你需要在自助机上获得一个排队号码(相当于一个ticket),然后当柜台叫到的号码与你手中的号码一致时,你将坐上柜台前面的椅子,此时柜台为你服务,这也是这种实现方式被称为"ticket spinlock"的原因。
在ticket spinlock中,"compare"和"swap"的操作就分离了。把spinlock当前的值和旧的值进行比较(compare),还是由每个试图获得spinlock的CPU来执行的,但设置新的值(swap),则是由上一个持有spinlock的CPU来完成的。
Linux版本
再来看下基于ARMv6的Linux中,ticket spinlock的实现(相关代码位于/arch/arm/include/asm/spinlock.h):
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
lock->tickets.owner++;
}
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
[LL/SC]
while (lockval.tickets.next != lockval.tickets.owner) {
wfe();
lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
}
}
"owner"和"next"分别对应ACRN版本中的"head"和"tail"。"wfe"是ARM中的"wait for event"指令,和x86中的pause指令类似,目的是为了降低busy wait时的CPU功耗。
看起来比ACRN的实现简洁?没有,LL/SC这部分也是一段汇编代码,它完成的是和x86的"add"指令一样的工作,和前文讲述的LL/SC是一样的。
__asm__ __volatile__(
"1: ldrex %0, [%3]\\n" // lockval = lock->slock
" add %1, %0, %4\\n" // newval = lockval + (1 << TICKET_SHIFT)
" strex %2, %1, [%3]\\n" // try lock->slock = newval
" teq %2, #0\\n" // test result = 0 ?
" bne 1b" // not equal, do LL/SC again
: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
: "cc");
这里之所以是"1<<TICKET_SHIFT"而不是1,是因为它没有把一个32位的变量全部用来表示spinlock的队列索引,而只是其中的一些bits,事实上也不可能有那么多的CPU同时等待一个spinlock。
公平与效率
可见,使用ticket spinlock可以让CPU按照到达的先后顺序,去获取spinlock的所有权,形成了「有序竞争」。根据硬件维护的cache一致性协议,如果spinlock的值没有更改,那么在busy wait时,试图获取spinlock的CPU,只需要不断地读取自己包含这个spinlock变量的cache line上的值就可以了,不需要从spinlock变量所在的内存位置读取。
但是,当spinlock的值被更改时,所有试图获取spinlock的CPU对应的cache line都会被invalidate,因为这些CPU会不停地读取这个spinlock的值,所以"invalidate"状态意味着此时,它们必须重新从内存读取新的spinlock的值到自己的cache line中。
而事实上,其中只会有一个CPU,也就是队列中最先达到的那个CPU,接下来可以获得spinlock,也只有它的cache line被invalidate才是有意义的,对于其他的CPU来说,这就是做无用功。内存比cache慢那么多,开销可不小。
还是用银行叫号来类比,假设现在2号客户的业务办理完了,接下来就该在大厅里叫3号,然后3号客户去办理,但是所有排号的,4号、5号……哪怕是20号,也得听一下叫的号,对于20号来说,它完全可以在叫到19号之前打个盹嘛。
每当一个spinlock的值出现变化时,所有试图获取这个spinlock的CPU都需要读取内存,刷新自己对应的cache line,而最终只有一个CPU可以获得锁,也只有它的刷新才是有意义的。锁的争抢越激烈(试图获取锁的CPU数目越多),无谓的开销也就越大。
【第三种实现 - MCS Lock】
如果在ticket spinlock的基础上进行一定的修改,让每个CPU不再是等待同一个spinlock变量,而是基于各自不同的per-CPU的变量进行等待,那么每个CPU平时只需要查询自己对应的这个变量所在的本地cache line,仅在这个变量发生变化的时候,才需要读取内存和刷新这条cache line,这样就可以解决上述的这个问题。
要实现类似这样的spinlock的「分身」,其中的一种方法就是使用MCS lock。试图获取一个spinlock的每个CPU,都有一份自己的MCS lock。
先来看下per-CPU的MCS lock是由哪些元素构造而成的(代码位于/kernel/locking/mcs_spinlock.h):
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked;
};
每当一个CPU试图获取一个spinlock,它就会将自己的MCS lock加到这个spinlock的等待队列,成为该队列的一个节点(node),加入的方式是由该队列末尾的MCS lock的"next"指向这个新的MCS lock。
"locked"的值为1表示该CPU是spinlock当前的持有者,为0则表示没有持有。
加锁对于一个锁的实现来说,最核心的操作无非就是「加锁」和「解锁」。先来看下MCS lock的加锁过程是怎样的:
void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
// 初始化node
node->locked = 0;
node->next = NULL;
// 找队列末尾的那个mcs lock
struct mcs_spinlock *prev = xchg(lock, node);
// 队列为空,立即获得锁
if (likely(prev == NULL)) {
return;
}
// 队列不为空,把自己加到队列的末尾
WRITE_ONCE(prev->next, node);
// 等待lock的持有者把lock传给自己
arch_mcs_spin_lock_contended(&node->locked);
}
前面说过,加入队列的方式是添加到末尾(tail),所以首先需要知道这个「末尾」在哪里。函数的第一个参数"lock"就是指向这个末尾的指针,之所以是二级指针,是因为它指向的是末尾节点里的"next"域,而"next"本身是一个指向"struct mcs_spinlock"的一级指针。
第二个参数"node"是试图加锁的CPU对应的MCS lock节点。
"xchg()"的名称来源于x86的XCHG指令,其实现可简化表示成这样:
xchg(*ptr, x)
{
ret = *ptr;
*ptr = x;
return ret;
}
它干了两件事,一是给一个指针赋值,二是获取了这个指针在赋值前的值。
对应着上面的这个mcs_spin_lock(),通过xchg()获得的"prev"就是"*lock"最初的值(prev = *lock)。如果这个值为"NULL",说明队列为空,当前没有其他CPU持有这个spinlock,那么试图获取这个spinlock的CPU可以成功获得锁。同时,xchg()还让lock指向了这个持有锁的CPU的node(*lock = node)。
这里用了"likely()",意思是在大部分情况下,队列都是空的,说明现实的应用场景中,一个spinlock的争抢通常不会太激烈。前面说过,"locked"的值为1表示持有锁,可此刻CPU获取锁之后,竟然没有把自己node的"locked"值设为1?这是因为在队列为空的情况,CPU可以立即获得锁,不需要基于"locked"的值进行spin,所以此时"locked"的值是1还是0,根本就无所谓。除非是在debug的时候,需要查看当前持有锁的CPU,否则绝不多留一丝「赘肉」。
如果队列不为空,那么就需要把自己这个"node"加入等待队列的末尾,"WRITE_ONCE()"的作用是赋值,在这篇文章里已经介绍过了。
具体的等待过程是调用arch_mcs_spin_lock_contended(),它等待的,或者说"spin"的,是自己MCS lock里的"value"的值,直到这个值变为1。而将这个值设为1,是由它所在队列的前面那个node,在释放spinlock的时候完成的。
#define arch_mcs_spin_lock_contended(l)
do {
smp_cond_load_acquire(l, VAL);
} while (0)
解锁
那基于MCS lock的实现,释放一个spinlock的过程是怎样的呢?来看下面这个函数:
void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
{
// 找到等待队列中的下一个节点
struct mcs_spinlock *next = READ_ONCE(node->next);
// 当前没有其他CPU试图获得锁
if (likely(!next)) {
// 直接释放锁
if (likely(cmpxchg_release(lock, node, NULL) == node))
return;
// 等待新的node添加成功
while (!(next = READ_ONCE(node->next)))
cpu_relax();
}
// 将锁传给等待队列中的下一个node
arch_mcs_spin_unlock_contended(&next->locked);
}
两个参数的含义同mcs_spin_lock()类似,"lock"代表队尾指针,"node"是准备释放spinlock的CPU在队列中的MCS lock节点。
大概率还是没有锁的争抢,"next"为空,说明准备释放锁的CPU已经是该队列里的最后一个,也是唯一一个CPU了,那么很简单,直接将"lock"设为NULL就可以了。
"cmpxchg_release()"中的"release"代表这里包含了一个memory barrier。如果不考虑这个memory barrier,那么它的实现可简化表示成这样:
cmpxchg(*ptr, old, new)
{
ret = *ptr;
if (*ptr == old)
*ptr = new;
return ret;
}
跟前面讲到的"xchg()"差不多,也是先获取传入指针的值并作为函数的返回值,区别是多了一个compare。结合mcs_spin_unlock()来看,就是如果"*lock == node",那么"*lock = NULL"。
如果"*lock != node",说明当前队列中有等待获取锁的CPU……等一下,这不是和前面的代码路径相矛盾吗?其实不然,两个原因:
距离函数开头获得"next"指针的值已经过去一段时间了。
回顾前面加锁的过程,新的node加入是先让"*lock"指向自己,再让前面一个node的"next"指向自己。
所以,在这个时间间隔里,可能又有CPU把自己添加到队列里来了。于是,待新的node添加成功后,才可以通过arch_mcs_spin_unlock_contended()将spinlock传给下一个CPU。
#define arch_mcs_spin_unlock_contended(l)
smp_store_release((l), 1)
传递spinlock的方式,就是将下一个node的"locked"值设为1(next->locked = 1)。
如果在释放锁的一开始,等待队列就不为空,则"lock"指针不需要移动:
可以看到,无论哪种情况,在解锁的整个过程中,持有锁的这个CPU既没有将自己node中的"locked"设为0,也没有将"next"设为NULL,好像清理工作做的不完整?
事实上,这已经完全无所谓了,当它像「击鼓传花」一样把spinlock交到下一个node手里,它就等同于从这个spinlock的等待队列中移除了。多一事不如少一事,少2个无谓的步骤,效率又可以提升不少。
所以,分身之后的spinlock在哪里?它就在每个MCS lock的"locked"域里,像波浪一样地向前推动着。"locked"的值为1的那个node,才是spinlock的「真身」。
使用MCS lock,就实现了上文那个银行叫号的例子所提出的设想,对于20号来说,不用再听大堂的广播,让19号办理完业务告诉你就行了。
存在的问题
MCS lock的实现保留在了Linux的代码中,但是你却找不到任何一个地方调用了它的lock和unlock的函数。
因为相比起Linux中只占4个字节的ticket spinlock,MCS lock多了一个指针,要多占4(或者8)个字节,消耗的存储空间是原来的2-3倍。spinlock可是操作系统中使用非常广泛的数据结构,这多占的存储空间不可小视,而且spinlock常常会被嵌入到结构体中,对于像"struct page"这种对结构体大小极为敏感的,根本不可能直接使用MCS lock。
所以,真正在Linux中使用的,是下文将要介绍的,在MCS lock的基础上进行了改进的qspinlock。研究MCS lock的意义,不光是理解qspinlock的必经之路,从代码的角度,可以看出其极致精炼的设计,绝没有任何多余的步骤,值得玩味。
MCS lock可以解决在锁的争用比较激烈的场景下,cache line无谓刷新的问题,但它内含一个指针,所以更消耗存储空间,但这个指针又是不可或缺的,因为正是依靠这个指针,持有spinlock的CPU才能找到等待队列中的下一个节点,将spinlock传递给它。本文要介绍的qspinlock,其首要目标就是把原生的MCS lock结构体进行改进,「塞」进4字节的空间里。
【MCS Lock的改进 - qspinlock】
先来看一下有3个以上的CPU持有或试图获取spinlock时,等待队列的全貌:
可见,这个等待队列是由一个qspinlock和若干个MCS lock节点组成的,或者说由一个qspinlock加上一个MCS node的队列组成,其中MCS node的队列和上文描述的那个队列基本是一样的,所以我们重点先来看下这个qspinlock是如何构成的(定义在/include/asm-generic/qspinlock_types.h):
typedef struct qspinlock {
union {
atomic_t val;
#ifdef __LITTLE_ENDIAN
struct {
u8 locked;
u8 pending;
};
struct {
u16 locked_pending;
u16 tail;
};
#else
...
}
}
这里使用的是union,目的是既可以直接引用结构体的位域,又可以直接引用整个变量。如果处理器采用的是little endian,那么它们的内存排布关系如下图所示:
如果采用的是big endian,其内存排布则是这样的:
以32位系统为例,"val"作为一个32位的变量,包含了三个部分:"locked byte", "pending"和"tail","tail"又细分为"tail index"和"tail cpu":
下面来说一下这几个部分各自代表什么含义。
MCS lock中表示是否持有锁的"locked"占据了32个bits,但它其实只需要表示0和1两个值,在qspinlock中被压缩成了8个bits,即"locked byte"(其实只需要1个bit)。
此外,MCS lock的结构体中没有专门的标识等待队列末尾的元素,它使用的是一个全局的二级指针来指向队列末尾。而qspinlock使用的是一个per-CPU的数组来表示每个MCS node(用qnode结构体表示),通过CPU编号作为数组的index,就可以获得对应 MCS node的内存位置,因而qspinlock使用的是末尾 MCS node的CPU编号加1,即"tail cpu",来记录等待队列tail的位置(加1的原因将在后面解释)。
struct qnode {
struct mcs_spinlock mcs;
};
DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]);
除了14个bits的"tail cpu"(针对CPU数目小于16K的情况),还有2个bits是用作"tail index",这是因为Linux中一共有4种context,分别是task, softirq, hardirq和nmi,而一个CPU在一种context下,至多试图获取一个spinlock(原因将在下文给出),因而一个CPU至多同时试图获取4个spinlock,"tail index"就是用来标识context的编号的。
以上说的两点是qspinlock在MCS lock已有元素的基础上进行的改进,相比于MCS lock,qspinlock还新增了一个元素:占据1个bit的"pending",它的作用将随着本文叙述的推进,慢慢揭晓。
为了方便演示,我们把一个"val"对应的"locked", "pending"和"tail"作为一个三元组"(x,y,z)"来表示:
加锁
第一个CPU试图获取锁 - uncontended
三元组的初始值是(0, 0, 0),当第一个CPU试图获取锁时,可以立即成为该spinlock的owner,此时三元组的值变为(0, 0, 1):
对应的代码实现是queued_spin_lock()的前半部分(位于/include/asm-generic/qspinlock.h):
#define _Q_LOCKED_OFFSET 0
#define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET)
void queued_spin_lock(struct qspinlock *lock)
{
// (0,0,0) --> (0,0,1)
u32 val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);
if (likely(val == 0))
return;
queued_spin_lock_slowpath(lock, val);
}
通过"cmpxchg()",和当前lock的"val"值进行比较,看是否为0,如果为0,那么获得spinlock,同时将lock的值设为1(即_Q_LOCKED_VAL)。
第二个CPU试图获取锁 - pending
在第一个CPU未释放锁之前,如果有第二个CPU试图获取锁,那么它必须等待,因而其获取锁的过程被称为"slow path",对应的代码实现是queued_spin_lock_slowpath()中的[part 1](该函数的完整实现在源代码中超过200行,本文是做了适当简化,并拆成几个部分来分开讲解)。
#define _Q_LOCKED_OFFSET 0
#define _Q_LOCKED_BITS 8
#define _Q_LOCKED_MASK ((1U << _Q_LOCKED_BITS) - 1) << _Q_LOCKED__OFFSET // (*,*,1)
// "val"是在queued_spin_lock中获取的当前lock的值
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
[part 2 - 第三个CPU试图获取spinlock]
...
goto queue; // 跳转到part 3
...
[part 1 - 第二个CPU试图获取spinlock]
// 设置pending位 (0,0,1) --> (0,1,1)
val = queued_fetch_set_pending_acquire(lock);
// 等待第一个CPU移交
if (val & _Q_LOCKED_MASK)
atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));
// 第二个CPU成功获取spinlock (0,1,0) --> (0,0,1)
clear_pending_set_locked(lock);
return;
[part 3 - 进入MCS lock队列]
queue:
...
[part 4 - 第三个CPU成功获取spinlock]
locked:
...
}
它首先通过queued_fetch_set_pending_acquire()函数设置"pending"为1,然后调用atomic_cond_read_acquire()函数,开始基于"locked byte"进行等待(或者说"spin"),此时三元组的值变为(0, 1, 1)。
像atomic_cond_read_acquire()这种函数,其实就是Linux中的原子操作这篇文章介绍的atomic_read()加上"cond"和"acquire",其中"cond"代表condition,表示spin基于的对象,而"acquire"用于设置Memory Barrier。这是为了保证应该在获取spinlock之后才能执行的代码,不要因为Memory Order的调换,在成功获取spinlock之前就执行了,那样就失去了保护Critical Section/Region的目的。
这第二个CPU可被视作是该spinlock的第一顺位继承人,可以做一个这样的类比:"locked byte"位域相当于是皇宫,而"pending"位域相当于是东宫,第二个CPU就是太子(假设按照立长的原则,先来后到),设置"pending"为1就相当于入主东宫,等待着继承大统。
皇帝大行之后(第一个CPU释放spinlock),皇位暂时空缺,三元组的的值变为(0, 1, 0)。之后,曾经的太子离开居住的东宫(将"pending"置0),进入皇宫,成为其新的主人(将"locked byte"置1)。这一过程对应的函数实现就是clear_pending_set_locked()。
第三个CPU试图获取锁 - uncontended queue
如果第二个CPU还在等待的时候,第三个CPU又来了,那么这第三个CPU就成了第二顺位继承人。它的等待路径的实现位于queued_spin_lock_slowpath()的[part 2]:
#define _Q_PENDING_OFFSET 8
#define _Q_PENDING_VAL (1U << _Q_PENDING_OFFSET) // (0,1,0)
[part 2A]
// (0,1,0)是spinlock移交的过渡状态
if (val == _Q_PENDING_VAL) {
// 等待移交完成 (0,1,0) --> (0,0,1)
int cnt = 1
val = atomic_cond_read_relaxed(&lock->val, (VAL != _Q_PENDING_VAL)|| !cnt--);
}
[part 2B]
// 已经有2个及以上的CPU持有或试图获取spinlock
if (val & ~_Q_LOCKED_MASK)
goto queue; // 进入MCS node队列
皇宫只有一个,东宫也只有一个,对于第二顺位继承人的王子来说,只能先去宫城外面自建府邸了。对应代码的实现就是[part 2B]的"goto queue",创建一个MCS node。
但前面提到,三元组的值为(0, 1, 0)时(即_Q_PENDING_VAL),是一个「政权移交」的过渡状态,此时第一顺位继承人即将成为spinlock新的持有者,那么作为第二顺位继承人,也即将递进成为第一顺位继承人,所以如果遇到了这个特殊时期,这第三个CPU会选择使用atomic_cond_read_relaxed()函数,先稍微等一会儿(即[part 2A]部分的代码)。
如果在它第二次读取"lock->val"的值时,已经不再是(0, 1, 0),说明移交已完成,变成了(0, 0, 1),那么它将绕过[part 2B],执行[part 1]的代码(也就是第二个CPU曾经走过的路)。
进入MCS node队列
(0, 1, 0)的状态毕竟是特例,大部分情况下,代码还是会跳转到名为"queue"的label位置,开始在MCS lock队列中的活动,对应的代码实现是queued_spin_lock_slowpath()的[part 3]。
首先是从前面提到的per-CPU的qnode[]数组中获取对应的MCS node,并初始化这个node,其中"idx"即表示context编号的"tail index"。
queue:
[part 3A]
struct mcs_spinlock *node = this_cpu_ptr(&qnodes[0].mcs);
int idx = node->count++;
node += idx;
node->locked = 0;
node->next = NULL;
然后是调用encode_tail()将自己的CPU编号和context编号「揉」进"tail"里,并通过xchg_tail()让qspinlock(即"lock")的"tail"指向自己。前面提到,"tail CPU"是CPU的编号加1,因为对于编号为0,tail index也为0的CPU,如果不加1,那么算出来的"tail"值就是0,而"tail"为0表示当前没有MCS node的队列。
[part 3B]
u32 tail = encode_tail(smp_processor_id(), idx);
// (p,1,1) -> (n,1,1)
u32 old = xchg_tail(lock, tail);
这里没有画出CPU C是基于哪个对象进行spin的,接下来的一节会给出答案。
第N个CPU试图获取锁 - contended queue
xchg_tail()除了在qspinlock中设置新的tail值,还返回了qspinlock中之前存储的tail值(即"old")。对于第N个(N > 3)试图获取spinlock的CPU,之前的tail值是指向上一个MCS node的,获得上一个node的目的是让它的"next"指向自己,这样才能加入MCS node的等待队列。
struct mcs_spinlock *prev, *next, *node;
[part 3C]
// 如果MCS node队列不为空 - 第N个CPU
if (old & _Q_TAIL_MASK) {
// 解析tail值,获取上一个MCS node
prev = decode_tail(old);
// 让上一个node指向自己,加入等待队列
WRITE_ONCE(prev->next, node); // prev->next = node
// 基于自己的locked值进行spin,在[part 4]中解除
arch_mcs_spin_lock_contended(&node->locked);
}
[part 3D]
#define _Q_LOCKED_PENDING_MASK (_Q_LOCKED_MASK | _Q_PENDING_MASK)
// 自己是MCS node队列的第一个节点 - 第二顺位继承
val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
加入MCS node组成的队列后,这第N个CPU会基于自己node的"locked"进行spin,这就完全回到了上文描述的MCS lock的路径里了。
[part 3D]中的这个atomic_cond_read_acquire()函数,在本文已经是第二次出现了,上一次是在[part 1]里,所不同的是,[part 1]里是第二个CPU(第一顺位继承人,CPU B)基于代表「大位」的qspinlock的"locked byte"进行spin,而这里是第三个CPU(第二顺位继承人,CPU C)基于qspinlock的"locked byte"加上代表「嗣位」的"pending"进行spin。
也就是说,当第一顺位继承人获得spinlock,空出嗣位后,如果第二顺位继承人已经建立了MCS node,那它并不会马上占据嗣位,而是要等到CPU B也释放了spinlock,"locked byte"和"pending"都为0时,才直接获取spinlock。
[part 3D]是放在第N个CPU试图获取spinlock的代码位置之后的,说明第N个CPU在解除基于自己node的"locked"的spin之后,也会执行这段代码,原因在接下来介绍完[part 4]的代码之后,将会自然地浮出水面。
第三个CPU成功获取锁
经过等待,当qspinlock的"locked byte"和"pending"都变为0时,第三个CPU总算熬出头了,它终于可以结束spin,获取这个spinlock。
[part 4]
locked:
// 唯一node, 直接获取锁 (n,0,0) --> (0,0,1)
if (((val & _Q_TAIL_MASK) == tail) &&
atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
goto release;
// 如果MSC node队列中还有其他node (n,0,0) --> (n,0,1)
set_locked(lock);
// 解除下一个node在MCS node队列中的spin
arch_mcs_spin_unlock_contended(&node->next->locked);
release:
__this_cpu_dec(mcs_nodes[0].count);
如果qspinlock的"tail"是指向自己的,说明它是当前MCS node队列中唯一的node,那么直接通过atomic_try_cmpxchg_relaxed()获取spinlock即可,三元组的值变为(0, 0, 1)。
如果不是,说明队列中还有其他的node在等待,那么按照上文描述的MCS node队列的规则,需要调用arch_mcs_spin_unlock_contended()函数,设置下一个node(CPU D)的"locked"为1。
当CPU C获取spinlock,离开MCS node队列后,CPU D就成为了MCS node队列中的第一个node,它的"locked"也被CPU C设为了1。在上文描述的MCS lock实现中,"locked"为1就意味着spinlock被上一个owner递交给了自己。但在这里,"locked"为1只是结束了在MCS node队列中的的spin。
结束在MCS node队列中的的spin后,意味着CPU D在前面[part 3C]中的等待就结束了,它将执行[part 3D]的代码,切换为第二顺位继承人进行spin应该基于的对象,即qspinlock的"locked byte"加上"pending"(和之前的CPU C一样)。
可见,对于第N个(N > 3)CPU来说,从它加入MCS node等待队列,到最终成功获取spinlock,其spin基于的对象并不是从一而终的,在到达队首之前都是基于自己的node进行spin,到达队首后则是基于qspinlock进行spin。
解锁
相比起加锁,解锁的过程看起来简直简单地不像话:
void queued_spin_unlock(struct qspinlock *lock)
{
smp_store_release(&lock->locked, 0);
}
只需要把qspinlock中的"locked byte"置为0就可以了。这也不难理解,后面一直有CPU「盯」着呢,一旦大位空缺,马上就有人补齐,您只需要乖乖地把位子腾出来就行。
到这里,qspinlock加锁和解锁的过程就介绍完了,可以借助下面这张图片来一览它的全貌:
从中可以看出各种状态下CPU进行spin时基于的对象,以及状态之间的变化和迁移。
【总结与思考】
回过头来看一下qspinlock的组成和实现:如果只有1个或2个CPU试图获取锁,那么只需要一个4字节的qspinlock就可以了,其所占内存的大小和ticket spinlock一样。当有3个以上的CPU试图获取锁,需要一个qspinlock加上(N-2)个MCS node。
对于设计合理的spinlock,在大多数情况下,锁的争抢都不应该太激烈,最大概率是只有1个CPU试图获得锁,其次是2个,并依次递减。
这也是qspinlock中加入"pending"位域的意义,如果是两个CPU试图获取锁,那么第二个CPU只需要简单地设置"pending"为1,而不用「另起炉灶」创建一个MCS node。这是除了体积减小外,qspinlock相较于MCS lock所做的第二层优化。
试图加锁的CPU数目超过3个是小概率事件,但一旦发生,使用ticket spinlock机制就会造成多个CPU的cache line无谓刷新的问题,而qspinlock可以利用MCS node队列来解决这个问题。
可见,使用qspinlock机制来实现spinlock,具有很好的可扩展性,也就是无论当前锁的争抢程度如何,性能都可以得到保证。所以在Linux中,如果针对某个处理器架构的代码没有单独定义,qspinlock将是spinlock默认的实现方式。
#define arch_spin_lock(l) queued_spin_lock(l)
作为SMP系统中的一个高频操作,spinlock的性能毫无疑问会影响到整个系统运行的性能,所以,从ticket spinlock到MCS lock,再到qspinlock,见证了内核开发者对其一步步的优化历程。在优秀的性能背后,是对每一行代码的反复推敲和考量,最终铸就了qspinlock精妙但并不简单的实现过程。也许,它还有优化的空间……
至此,对Linux中不同处理器架构可能采用的三种spinlock的实现方式的介绍就告一段落了,下文将开始介绍Linux中spinlock的具体使用方法。
参考:
en.wikipedia.org/wiki/C
LWN - Ticket spinlocks
以上是关于Linux中的spinlock机制 - CAS和ticket spinlock的主要内容,如果未能解决你的问题,请参考以下文章