linux 内核并发同步 2

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了linux 内核并发同步 2相关的知识,希望对你有一定的参考价值。

信号量semaphore

信号量是一种允许进程进入睡眠的同步机制,信号量是一个计数器,支持两种原语即P 和V操作,也就是down 和up 操作,

/* Please dont access any members of this structure directly */
struct semaphore
raw_spinlock_t lock;//用于对count以及wait_list成员的保护
unsigned int count;//表示允许进入临界区的控制路径
struct list_head wait_list;//用于管理所有在该信号上睡眠的进程
;
#define __SEMAPHORE_INITIALIZER(name, n) \\
\\
.lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \\
.count = n, \\
.wait_list = LIST_HEAD_INIT((name).wait_list), \\


#define DEFINE_SEMAPHORE(name) \\
struct semaphore name = __SEMAPHORE_INITIALIZER(name, 1)

static inline void sema_init(struct semaphore *sem, int val)

static struct lock_class_key __key;
*sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0);

信号量的操作:

void down(struct semaphore *sem);
int __must_check down_interruptible(struct semaphore *sem);
int __must_check down_killable(struct semaphore *sem);
int __must_check down_trylock(struct semaphore *sem);
int __must_check down_timeout(struct semaphore *sem, long jiffies);
void up(struct semaphore *sem); 1 /** 2 * down_interruptible - acquire the semaphore unless interrupted 3 * @sem: the semaphore to be acquired
4  *
5 * Attempts to acquire the semaphore. If no more tasks are allowed to
6 * acquire the semaphore, calling this function will put the task to sleep.
7 * If the sleep is interrupted by a signal, this function will return -EINTR.
8 * If the semaphore is successfully acquired, this function returns 0.
9 */
10 int down_interruptible(struct semaphore *sem)
11
12 unsigned long flags;
13 int result = 0;
14
15 raw_spin_lock_irqsave(&sem->lock, flags);
16 if (likely(sem->count > 0))
17 sem->count--;
18 else
19 result = __down_interruptible(sem);
20 raw_spin_unlock_irqrestore(&sem->lock, flags);
21
22 return result;
23
24 static noinline int __sched __down_interruptible(struct semaphore *sem)
25
26 return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
27
28 /*
29 * Because this function is inlined, the state parameter will be
30 * constant, and thus optimised away by the compiler. Likewise the
31 * timeout parameter for the cases without timeouts.
32 */
33 static inline int __sched __down_common(struct semaphore *sem, long state,
34 long timeout)
35
36 struct task_struct *task = current;
37 struct semaphore_waiter waiter;
38
39 list_add_tail(&waiter.list, &sem->wait_list);//添加到末尾 fifo 先进先出
40 waiter.task = task;
41 waiter.up = false;
42
43 for (;;)
44 if (signal_pending_state(state, task))//收到信号且不是sigkill
45 goto interrupted;
46 if (unlikely(timeout <= 0))//超时唤醒
47 goto timed_out;
48 __set_task_state(task, state);
49 raw_spin_unlock_irq(&sem->lock);
50 timeout = schedule_timeout(timeout);
51 raw_spin_lock_irq(&sem->lock);
52 if (waiter.up)//up 为true 表示被唤醒切获取到自旋锁
53 return 0;
54
55
56 timed_out:
57 list_del(&waiter.list);
58 return -ETIME;
59
60 interrupted:
61 list_del(&waiter.list);
62 return -EINTR;
63

可知16-19行为临界区,涉及到count 的计数 会使用spinlock 来保护,由于这些信号量可能在中断函数里面使用,所以需要关闭本地cpu 中断,这里采用raw_spin_lock_irqsave,如果count值小于0 表示当前进程无法获取该信号量使用down_interrupt来执行睡眠等待操作,

49-50行:主动让出cpu之前需要释放该锁,

1 /**
2 * up - release the semaphore
3 * @sem: the semaphore to release
4 *
5 * Release the semaphore. Unlike mutexes, up() may be called from any
6 * context and even by tasks which have never called down().
7 */
8 void up(struct semaphore *sem)
9
10 unsigned long flags;
11
12 raw_spin_lock_irqsave(&sem->lock, flags);
13 if (likely(list_empty(&sem->wait_list)))
14 sem->count++;
15 else
16 __up(sem);
17 raw_spin_unlock_irqrestore(&sem->lock, flags);
18
19 static noinline void __sched __up(struct semaphore *sem)
20
21 struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
22 struct semaphore_waiter, list);
23 list_del(&waiter->list);
24 waiter->up = true;
25 wake_up_process(waiter->task);
26
27 /**
28 * wake_up_process - Wake up a specific process
29 * @p: The process to be woken up.
30 *
31 * Attempt to wake up the nominated process and move it to the set of runnable
32 * processes.
33 *
34 * Return: 1 if the process was woken up, 0 if it was already running.
35 *
36 * It may be assumed that this function implies a write memory barrier before
37 * changing the task state if and only if any tasks are woken up.
38 */
39 int wake_up_process(struct task_struct *p)
40
41 return try_to_wake_up(p, TASK_NORMAL, 0);
42

可知:如果信号量上的等待队列为空,则说明没有进程等待该变量,那么只需要将count++,如果不为空则有进程在等待队列休眠,需要up叫醒他们。up函数中唤醒进程是先进先出。

MUTEX 互斥体

mutex是阻塞锁,当某线程无法获取互斥量时,该线程会被直接挂起,该线程不再消耗CPU时间,当其他线程释放互斥量后,操作系统会激活那个被挂起的线程,让其投入运行.

问题:mutex 互斥体解决了什么问题?,mutex 和信号量的区别?

信号量可以根据初始化的count值大小分为计数信号量和互斥信号量。mutex 相当于count计数等于1的信号量,为什么还有mutex呢?

猜测原因:应该是在锁竞争激烈的场景下,mutex 执行更快吧,


A ​​mutex​​​ is essentially the same thing as a binary semaphore and sometimes uses
the same basic implementation. The differences between them are in how
they are used. While a binary semaphore may be used as a mutex, a mutex
is a more specific use-case, in that only the thread that locked the
mutex is supposed to unlock it. This constraint makes it possible to
implement some additional features in mutexes:

  1. Since only the thread that locked the mutex is supposed to unlock
    it, a mutex may store the id of thread that locked it and verify the
    same thread unlocks it.
  2. Mutexes may provide​​priority inversion​​​ safety. If the mutex knows who locked it and is supposed to unlock it,
    it is possible to promote the priority of that thread whenever a
    higher-priority task starts waiting on the mutex.
  3. Mutexes may also provide deletion safety, where the thread holding the mutex cannot be accidentally deleted.
  4. Alternately, if the thread holding the mutex is deleted (perhaps due
    to an unrecoverable error), the mutex can be automatically released.
  5. A mutex may be recursive: a thread is allowed to lock it multiple times without causing a deadlock.

 

/*
* Simple, straightforward mutexes with strict semantics:
*
* - only one task can hold the mutex at a time
* - only the owner can unlock the mutex
* - multiple unlocks are not permitted
* - recursive locking is not permitted
* - a mutex object must be initialized via the API
* - a mutex object must not be initialized via memset or copying
* - task may not exit with mutex held
* - memory areas where held locks reside must not be freed
* - held mutexes must not be reinitialized
* - mutexes may not be used in hardware or software interrupt
* contexts such as tasklets and timers
*
* These semantics are fully enforced when DEBUG_MUTEXES is
* enabled. Furthermore, besides enforcing the above rules, the mutex
* debugging code also implements a number of additional features
* that make lock debugging easier and faster:
*
* - uses symbolic names of mutexes, whenever they are printed in debug output
* - point-of-acquire tracking, symbolic lookup of function names
* - list of all locks held in the system, printout of them
* - owner tracking
* - detects self-recursing locks and prints out all relevant info
* - detects multi-task circular deadlocks and prints out all affected
* locks and tasks (and only those tasks)
*/
struct mutex
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;//原子计数 1表示没有人持有锁,0 表示被持有
spinlock_t wait_lock; //用于保护wait_list睡眠等待队列
struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
;
Spinner MCS lock : mcs 一种spinlock的优化方案

mutex 实现:
1 /**
2 * mutex_lock - acquire the mutex
3 * @lock: the mutex to be acquired
4 *
5 * Lock the mutex exclusively for this task. If the mutex is not
6 * available right now, it will sleep until it can get it.
7 *
8 * The mutex must later on be released by the same task that
9 * acquired it. Recursive locking is not allowed. The task
10 * may not exit without first unlocking the mutex. Also, kernel
11 * memory where the mutex resides must not be freed with
12 * the mutex still locked. The mutex must first be initialized
13 * (or statically defined) before it can be locked. memset()-ing
14 * the mutex to 0 is not allowed.
15 *
16 * ( The CONFIG_DEBUG_MUTEXES .config option turns on debugging
17 * checks that will enforce the restrictions and will also do
18 * deadlock debugging. )
19 *
20 * This function is similar to (but not equivalent to) down().
21 */
22 void __sched mutex_lock(struct mutex *lock)
23
24 might_sleep();
25 /*
26 * The locking fastpath is the 1->0 transition from
27 * unlocked into locked state.
28 */
29 __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
30 mutex_set_owner(lock);//成功持有锁后 lock->owner 只想当前进程的task_struct 当前线程的thread_info可以通过sp低13位清零获取到

eg:如果count计数减1后小于0,表示该锁已经被人持有,这进入slowpath,  1 __mutex_lock_slowpath(atomic_t *lock_count)

2 
3 struct mutex *lock = container_of(lock_count, struct mutex, count);
4
5 __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
6 NULL, _RET_IP_, NULL, 0);
7
8
9 /*
10 * Lock a mutex (possibly interruptible), slowpath:
11 */
12 static __always_inline int __sched
13 __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
14 struct lockdep_map *nest_lock, unsigned long ip,
15 struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
16
17 struct task_struct *task = current;// 通过sp低13位清零获取到thread_info,再通过thread_info获取到task指针
18 struct mutex_waiter waiter;
19 unsigned long flags;
20 int ret;
21
22 if (use_ww_ctx)
23 struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
24 if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
25 return -EALREADY;
26
27
28 preempt_disable();
29 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
30
31 if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx))
32 /* got the lock, yay! */
33 preempt_enable();
34 return 0;
35
36
37 spin_lock_mutex(&lock->wait_lock, flags);// 获取lock->wait_lock自旋锁,直到获取成功为止,因为接下来的代码要对lock的成员变量进行读写,需要互斥访问
38
39 /*
40 * Once more, try to acquire the lock. Only try-lock the mutex if
41 * it is unlocked to reduce unnecessary xchg() operations.
42 */ // 循环读取互斥锁计数器count的值,直到count为1,表示互斥锁可用,此处读取互斥锁计数器的同时也将互斥锁的值设置为0了
43 if (!mutex_is_locked(lock) &&
44 (atomic_xchg_acquire(&lock->count, 0) == 1))
45 goto skip_wait;
46
47 debug_mutex_lock_common(lock, &waiter);
48 debug_mutex_add_waiter(lock, &waiter, task_thread_info(task));
49
50 /* add waiting tasks to the end of the waitqueue (FIFO): */
51 list_add_tail(&waiter.list, &lock->wait_list);
52 waiter.task = task;
53
54 lock_contended(&lock->dep_map, ip);
55
56 for (;;)
57 /*
58 * Lets try to take the lock again - this is needed even if
59 * we get here for the first time (shortly after failing to
60 * acquire the lock), to make sure that we get a wakeup once
61 * its unlocked. Later on, if we sleep, this is the
62 * operation that gives us the lock. We xchg it to -1, so
63 * that when we release the lock, we properly wake up the
64 * other waiters. We only attempt the xchg if the count is
65 * non-negative in order to avoid unnecessary xchg operations:
66 */
67 if (atomic_read(&lock->count) >= 0 &&
68 (atomic_xchg_acquire(&lock->count, -1) == 1))
69 break;
70
71 /*
72 * got a signal? (This code gets eliminated in the
73 * TASK_UNINTERRUPTIBLE case.)
74 */
75 if (unlikely(signal_pending_state(state, task)))
76 ret = -EINTR;
77 goto err;
78
79
80 if (use_ww_ctx && ww_ctx->acquired > 0)
81 ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
82 if (ret)
83 goto err;
84
85 // 设置当前任务状态TASK_UNINTERRUPTIBLE
86 __set_task_state(task, state);
87
88 /* didnt get the lock, go to sleep: */
89 spin_unlock_mutex(&lock->wait_lock, flags); // 释放自旋锁(此处对lock的成员变量修改读写都已经完成,其他任务可读写修改
90 schedule_preempt_disabled();/ 执行一次调度,主动切换到其他任务,等其他任务释放互斥锁是会唤醒当前任务,继续执行下面的函数
91 spin_lock_mutex(&lock->wait_lock, flags);// 任务被唤醒,重新获取自旋锁
92
93 __set_task_state(task, TASK_RUNNING);
94
95 mutex_remove_waiter(lock, &waiter, current_thread_info());
96 /* set it to 0 if there are no waiters left: */
97 if (likely(list_empty(&lock->wait_list)))
98 atomic_set(&lock->count, 0);
99 debug_mutex_free_waiter(&waiter);
100
101 skip_wait:
102 /* got the lock - cleanup and rejoice! */
103 lock_acquired(&lock->dep_map, ip);
104 mutex_set_owner(lock);
105
106 if (use_ww_ctx)
107 struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
108 ww_mutex_set_context_slowpath(ww, ww_ctx);
109
110
111 spin_unlock_mutex(&lock->wait_lock, flags);
112 preempt_enable();
113 return 0;
114
115 err:
116 mutex_remove_waiter(lock, &waiter, task_thread_info(task));
117 spin_unlock_mutex(&lock->wait_lock, flags);
118 debug_mutex_free_waiter(&waiter);
119 mutex_release(&lock->dep_map, 1, ip);
120 preempt_enable();
121 return ret;
122

就不详细看了 ;

主要是mutex 和semaphore使用区别:一个是互斥一个 计数信号量

以生活现象为例:超市最多允许同时进100个人,一开始100人顺利进入,后面出一个进一个

// 通过sp低13位清零获取到thread_info,再通过thread_info获取到task指针

http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子



以上是关于linux 内核并发同步 2的主要内容,如果未能解决你的问题,请参考以下文章

Linux(内核剖析):28---内核同步之(临界区竞争条件同步锁常见的内核并发SMNP和UP配置选项锁的争用和扩展性(锁粒度))

《Linux内核设计与实现》读书笔记- 内核同步介绍

(笔记)Linux内核学习之并发和同步概念

重温Linux内核:互斥和同步

Linux内核同步原语之原子操作

Linux并发与同步专题