[OC学习笔记]多线程之GCD

Posted Billy Miracle

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[OC学习笔记]多线程之GCD相关的知识,希望对你有一定的参考价值。

串行队列和并发队列的源码解析

在我们的开发过程中,使用队列的时候,苹果给我们给了3个获取队列的方法:

//主队列
dispatch_queue_t mainQueue = dispatch_get_main_queue();
//全局并发队列
dispatch_queue_t globalQueue = dispatch_get_global_queue(0, 0);
//自己创建的队列
dispatch_queue_t normalQueue = dispatch_queue_create("com.test.serial", DISPATCH_QUEUE_SERIAL);

dispatch_get_main_queue

打开源码,可以在queue.h里面找到对应的代码:

dispatch_queue_main_t
dispatch_get_main_queue(void)

	return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);

接下来找DISPATCH_GLOBAL_OBJECT

#define DISPATCH_GLOBAL_OBJECT(type, object) ((type)&(object))
// dispatch_queue_main_t & _dispatch_main_q 

可以得出类型是dispatch_queue_main_t,对象是_dispatch_main_q,继续搜索_dispatch_main_q

struct dispatch_queue_static_s _dispatch_main_q = 
	DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
	.do_targetq = _dispatch_get_default_queue(true),
#endif
	.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
			DISPATCH_QUEUE_ROLE_BASE_ANON,
	.dq_label = "com.apple.main-thread",
	.dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
	.dq_serialnum = 1,
;

可以看到:主队列的lable = com.apple.main-threaddq_serialnum = 1就说明是一个串行队列,那么队列是怎么创建的呢?我们知道这里用到了一个函数dispatch_queue_create,接下来就先探索一下吧。

dispatch_queue_create

我们再来看一看如何创建队列,打开源码,找到dispatch_queue_create

// queue.c
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)

	return _dispatch_lane_create_with_target(label, attr,
			DISPATCH_TARGET_QUEUE_DEFAULT, true);

_dispatch_lane_create_with_target

可以看到是调用_dispatch_lane_create_with_target并添加了2个默认参数实现的,找到它的对应实现:

static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
		dispatch_queue_t tq, bool legacy)

	dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);

	// 优先级的处理
	// Step 1: Normalize arguments (qos, overcommit, tq)
	//

	...

	// 初始化queue
	// Step 2: Initialize the queue
	//

	...
	
	// 申请和开辟内存
	dispatch_lane_t dq = _dispatch_object_alloc(vtable,
			sizeof(struct dispatch_lane_s));
	// 构造函数初始化  dqai.dqai_concurrent:是否是并发
	_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
			DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
			(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

	dq->dq_label = label;
	dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
			dqai.dqai_relpri);
	if (overcommit == _dispatch_queue_attr_overcommit_enabled) 
		dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
	
	if (!dqai.dqai_inactive) 
		_dispatch_queue_priority_inherit_from_target(dq, tq);
		_dispatch_lane_inherit_wlh_from_target(dq, tq);
	
	_dispatch_retain(tq);
	dq->do_targetq = tq;
	_dispatch_object_debug(dq, "%s", __func__);
	return _dispatch_trace_queue_create(dq)._dq;

第一行中的_dispatch_queue_attr_to_info方法里面,把我们传入的DISPATCH_QUEUE_SERIAL或者DISPATCH_QUEUE_CONCURRENT参数进行封装,封装成了dqai。我们可以大致看看封装的实现:

dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)

	dispatch_queue_attr_info_t dqai =  ;
	// 串行队列直接返回空的 dqai 结构体
	if (!dqa) return dqai;

#if DISPATCH_VARIANT_STATIC
	if (dqa == &_dispatch_queue_attr_concurrent) 
		// ⚠️
		dqai.dqai_concurrent = true;
		return dqai;
	
#endif

	if (dqa < _dispatch_queue_attrs ||
			dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) 
#ifndef __APPLE__
		if (memcmp(dqa, &_dispatch_queue_attrs[0],
				sizeof(struct dispatch_queue_attr_s)) == 0) 
			dqa = (dispatch_queue_attr_t)&_dispatch_queue_attrs[0];
		 else
#endif // __APPLE__
		DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
	

	size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
	
	// 并发队列结构体位域的默认配置和赋值
	dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
	idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;
	// ⚠️⚠️
	dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
	idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;

	dqai.dqai_relpri = -(int)(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
	idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;

	dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
	idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;

	dqai.dqai_autorelease_frequency =
			idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
	idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;

	dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
	idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;

	return dqai;

dqai里面有个dqai_concurrent的属性,顾名思义是代表是否是并发,那么默认的就是串行。
接下来继续看如何根据dqai创建队列的:

// 构造函数初始化  dqai.dqai_concurrent:是否是并发
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
		DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
		(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

可以看到通过init方法初始化,第三个参数,如果是并发传入DISPATCH_QUEUE_WIDTH_MAX,如果是串行传入1。

#define DISPATCH_QUEUE_WIDTH_FULL_BIT		0x0020000000000000ull
#define DISPATCH_QUEUE_WIDTH_FULL			0x1000ull
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
#define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2)
#define DISPATCH_QUEUE_USES_REDIRECTION(width) \\
		( uint16_t _width = (width); \\
		_width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; )

而这里是DISPATCH_QUEUE_WIDTH_MAX的定义,可以计算其结果是14。

_dispatch_queue_init

我们再看init函数内部实现:

static inline dispatch_queue_class_t
_dispatch_queue_init(dispatch_queue_class_t dqu, dispatch_queue_flags_t dqf,
		uint16_t width, uint64_t initial_state_bits)

	uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
	dispatch_queue_t dq = dqu._dq;

	dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
			DISPATCH_QUEUE_INACTIVE)) == 0);

	if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) 
		dq->do_ref_cnt += 2; // rdar://8181908 see _dispatch_lane_resume
		if (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE) 
			dq->do_ref_cnt++; // released when DSF_DELETED is set
		
	

	dq_state |= initial_state_bits;
	dq->do_next = DISPATCH_OBJECT_LISTLESS;
	// ⚠️⚠️
	dqf |= DQF_WIDTH(width);
	os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
	dq->dq_state = dq_state;
	dq->dq_serialnum =
			os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
	return dqu;

可以得出:
如果是并发队列dqf |= DQF_WIDTH(DISPATCH_QUEUE_WIDTH_MAX) 如果是串行队列dqf |= DQF_WIDTH(1)。串行队列和并发队列最根本的区别就是DQF_WIDTH不同,串行队列的为1。这个width可以抽象的理解为队列出口的宽度。可以把串行队列想成一个单向单车道,把任务想成一辆辆车子,车子通过的时候必须一辆一辆按顺序通过;而并发队列可以想成单向多车道,有多个出口,车子可以并行通过。
继续看dq->dq_serialnum = os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);

// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - mgr_root_q
// 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
// 17 - workloop_fallback_q
// we use 'xadd' on Intel, so the initial value == next assigned
#define DISPATCH_QUEUE_SERIAL_NUMBER_INIT 17
extern unsigned long volatile _dispatch_queue_serial_numbers;

所以这里的_dispatch_queue_serial_numbers只是代表的是创建的队列的归属(串行还是并发),所以上面的dq->dq_serialnum = 1就是创建的主队列也是串行队列 。
再回到_dispatch_lane_create_with_target,看到下面有:

_dispatch_retain(tq);
dq->do_targetq = tq;

这个 tq 是在哪赋值的呢?向上找,在省略的部分找到:

// priority.h
#define DISPATCH_QOS_UNSPECIFIED        ((dispatch_qos_t)0)
#define DISPATCH_QOS_DEFAULT            ((dispatch_qos_t)4)

// queue.c
    ...
	else 
		if (overcommit == _dispatch_queue_attr_overcommit_unspecified) 
			// Serial queues default to overcommit!
			// 如果是并发 overcommit = _dispatch_queue_attr_overcommit_disabled
			// 如果是串行 overcommit = _dispatch_queue_attr_overcommit_enabled
			overcommit = dqai.dqai_concurrent ?
					_dispatch_queue_attr_overcommit_disabled :
					_dispatch_queue_attr_overcommit_enabled;
		
	
	if (!tq) 
		tq = _dispatch_get_root_queue(
				qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
				overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;
		if (unlikely(!tq)) 
			DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
		
	
	...
static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)

	if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) 
		DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
	
	// qos 为 4,4-1= 3
	// 2*3 + 0或者1 = 6/7
	// 然后再去数组 _dispatch_root_queues 里取数组的 6 或者 7 的下标指针地址
	return &_dispatch_root_queues[2 * (qos - 1) + overcommit];

tq 的值是通过 _dispatch_root_queues 数组取出来的,直接到数组里面看就一目了然了。由此可以发现 tq 就是 dq_label 的值,也就是外面队列 target 的值。

// 串行队列
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
	.dq_label = "com.apple.root.utility-qos.overcommit",
	.dq_serialnum = 9,
),
// 并发队列(全局和并发是一样的)
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
	.dq_label = "com.apple.root.default-qos",
	.dq_serialnum = 10,
),
// 主队列
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
		DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
	.dq_label = "com.apple.root.default-qos.overcommit",
	.dq_serialnum = 11,
),

既然串行队列和并发队列的 target 信息是从 _dispatch_root_queues 结构体数组取出来的,那么 _dispatch_root_queues 又是在哪创建的呢?我们来到最先初始化的 libdispatcdispatch_queue_createh_init 里的查找,最终在 _dispatch_introspection_init 里找到一些代码:

队列是通过 for 循环,调用 _dispatch_trace_queue_create,再取出 _dispatch_root_queues 里的地址指针一个一个创建出来的。

GCD深入了解

执行任务的方式

执行任务的函数分为两种,同步和异步函数:

  • 同步函数:即 dispatch_sync,必须等待当前语句执行完毕,才会执行下一条语
    句,不会开启线程,在当前执行任务。
  • 异步函数,即 dispatch_async。不用等待当前语句执行完毕,就可以执行下一条语句,会开启线程执行任务。

dispatch_sync 同步源码分析

找到源码中同步函数的实现:

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)

	uintptr_t dc_flags = DC_FLAG_BLOCK;
	if (unlikely(_dispatch_block_has_private_data(work))) 
		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
	
	_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);

unlikely 的意思是基本上不会走,然后就来到 _dispatch_sync_f 函数,_dispatch_sync_f 的第三个参数是将 block 包装了一下:

#define _dispatch_Block_invoke(bb) \\
		((dispatch_function_t)((struct Block_layout *)bb)->invoke)

继续往下看:

static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
		uintptr_t dc_flags)

	_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);

之后就会来到 _dispatch_sync_f_inline 函数,实现如下:

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)

	// 串行就会走这下面
	if (likely(dq->dq_width == 1)) 
		return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
	

	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) 
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	

	dispatch_lane_t dl = upcast(dq)._dl;
	// Global concurrent queues and queues bound to non-dispatch threads
	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
	if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) 
		return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
	

	if (unlikely(dq->do_targetq->do_targetq)) 
		return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
	
	_dispatch_introspection_sync_begin(dl);
	_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
			_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));

注意这里,调用了_dispatch_barrier_sync_f这个从名字看,最终调用了栅栏函数。但是为什么要调用栅栏函数呢?我们先继续往里分析:

static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)

	_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);

来到 _dispatch_barrier_sync_f_inline,这里的参数func就是我们外面的block任务:

static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)

	// 获取线程ID
	dispatch_tid tid = _dispatch_tid_self();

	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) 
		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
	

	dispatch_lane_t dl = upcast(dq)._dl;
	// 死锁
	if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) 
		return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
				DC_FLAG_BARRIER | dc_flags);
	

	if (unlikely(dl->do_targetq->do_targetq)) 
		return _dispatch_sync_recurse(dl, ctxt, func,
				DC_FLAG_BARRIER | dc_flags);
	
	_dispatch_introspection_sync_begin(dl);
	_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
			DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
					dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));

在这个函数里会先获取线程 id,因为队列需要绑定到线程然后依赖执行,而死锁的原因在于同步线程里的任务出现你等我,我等你的现象,所以只有 _dispatch_queue_try_acquire_barrier_sync 用到了线程 id

static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)

	return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);

static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
		uint32_t tid, uint64_t suspend_count)

	uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
	uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
			_dispatch_lock_value_from_tid(tid) |
			DISPATCH_QUEUE_UNCONTENDED_SYNC |
			(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
	uint64_t old_state, new_state;
	// 从 os 底层获取信息,也就是通过线程和当前队列获取 new_state 返回出去
	return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, 
		uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
		if (old_state != (init | role)) 
			os_atomic_rmw_loop_give_up(break);
		
		new_state = value | role;
	);

从 os 底层获取到了一个 new_state 之后,就会继续执行 _dispatch_sync_f_slow

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
		dispatch_function_t func, uintptr_t top_dc_flags,
		dispatch_queue_class_t dqu, uintptr_t dc_flags)

	dispatch_queue_t top_dq = top_dqu._dq;
	dispatch_queue_t dq = dqu._dq;
	if (unlikely(!dq->do_targetq)) 
		return _dispatch_sync_function_invoke(dq, ctxt, func);
	

	pthread_priority_t pp = _dispatch_get_priority();
	// 初始化保存 block 以及其他信息的结构体
	struct dispatch_sync_context_s dsc = 
		.dc_flags    = DC_FLAG_SYNC_WAITER 以上是关于[OC学习笔记]多线程之GCD的主要内容,如果未能解决你的问题,请参考以下文章

iOS底层探索之多线程—GCD源码分析(栅栏函数)

[OC学习笔记]GCD复习

[OC学习笔记]GCD复习

[OC学习笔记]浅学GCD线程相关内容

[OC学习笔记]Grand Central Dispatch

iOS底层探索之多线程—GCD源码分析( 信号量dispatch_semaphore_t)