线程同步
Posted guo_jun2
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程同步相关的知识,希望对你有一定的参考价值。
同步主要有两种:通信和数据保护
一、数据同步的充分条件:多段代码、共享数据、修改数据
1.多段代码正在并发运行
2.这几段代码在访问(读或写)同一个数据
3.至少有一段代码在修改(写)数据
案例1.
async Task<int> ModifyValueConcurrentlyAsync() { var data = new SharedData(); var task1 = ModifyValueAsync(data); var task2 = ModifyValueAsync(data); var task3 = ModifyValueAsync(data); await Task.WhenAll(task1, task2, task3); return data.Value; } async Task ModifyValueAsync(SharedData data) { await Task.Delay(TimeSpan.FromSeconds(1)); data.Value = data.Value + 1; } class SharedData { public int Value { get; set; } }
ModifyValueConcurrentlyAsync如果是在GUI或ASP.NET上下文中调用的(或同一时间内只允许一段代码运行的任何其他上下文),就不需要同步。而从线程池调用就需要同步。
假如把数据改为私有成员作为共享数据,需要同步吗?同样从线程池调用需要同步,并且即使在“同一时间只运行一段代码”上下文中也需要同步,因为ModifyValueConcurrentlyAsync可能被多次调用,每个独立的调用都共享这个value,此时如果要避免这种共享就要同步,此时的同步其实是对异步方法的一种限流。
案例2.
void IndependentParallelism(IEnumerable<int> values) { Parallel.ForEach(values, item => Trace.WriteLine(item)); }
Parallel把数据分配给各个线程,没有共享数据不需要同步
int result = 0; Parallel.ForEach( source: values, localInit: () => 0, body: (item, state, localvalue) => localvalue + item, localFinally: localvalue => { result += localvalue; } );
此时localFinally中多个线程同时访问并修改result,因此需要同步
案例3.
var stack = ImmutableStack<int>.Empty; var task1 = Task.Run(() => Trace.WriteLine(stack.Push(3).Peek())); var task2 = Task.Run(() => Trace.WriteLine(stack.Push(5).Peek())); await Task.WhenAll(task1, task2, task3); return stack.IsEmpty;
总是返回true,不需要同步。但是不可变集合有一个共享的根变量,它本身不是不可变的,下面代码中每个线程向栈压入一个值,然后修改这个变量,此时就需要同步。
var task1 = Task.Run(() => { stack = stack.Push(3); }); var task2 = Task.Run(() => { stack = stack.Push(5); });
线程安全集合本身是同步的,所以不需要同步
var dictionary = new ConcurrentDictionary<int, int>(); var task1 = Task.Run(() => { dictionary.TryAdd(2, 3); }); var task2 = Task.Run(() => { dictionary.TryAdd(3, 5); });
二、同步方式
信号是一种通用的线程通知机制,如果只是用来协调对共享数据的访问,就用锁;如果是一个线程间发送小块数据,可用生产者/消费者队列;其他情况用信号。
0.锁
- 限制锁的作用范围:定义一个专用的成员,不要暴露给非本类的方法,不要lock(this)、Type或string类型实例
- 明确锁保护的内容
- 锁范围内的代码尽量少:不要做任何阻塞操作
- 不运行随意的代码,包括引发事件、调用委托、调用虚拟方法
0.1、阻塞锁:lock、Monitor、SpinLock、ReaderWriterLockSlim
可能导致死锁,在访问关键资源时考虑用Monitor.TryEnter()来实现,可包装到泛型类中以便复用:
public sealed class LockHolder<T> : IDisposable where T:class { private T handle; private bool holdsLock; public LockHolder(T handle, int milliSecondTimeOut) { this.handle = handle; holdsLock = Monitor.TryEnter(handle, milliSecondTimeOut); } public bool LockSuccessful { get { return holdsLock; } } public void Dispose() { if (holdsLock) Monitor.Exit(handle); holdsLock = false; } } object lockHandle = new object(); using (LockHolder<object> lockObj = new LockHolder<object>(lockHandle, 1000)) { if(lockObj.LockSuccessful) { //Work elided } }
0.2、异步锁(兼容async)
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1); private int _value; public async Task DelayAndIncrementAsync() { await _mutex.WaitAsync(); try { var oldValue = _value; await Task.Delay(TimeSpan.FromSeconds(oldValue)); _value = oldValue + 1; } finally { _mutex.Release(); } }
1.原子操作
2.Mutex
using (var m = new Mutex(false, MutexName)) { if(!m.WaitOne(TimeSpan.FromSeconds(5), false)) { //Second instance is running } else { //dowork m.ReleaseMutex(); } }
3.SemaphoreSlim
4.AutoResetEvent
5.ManualResetEventSlim
private readonly ManualResetEventSlim _initialize = new ManualResetEventSlim(); private int _value; public int WaitForInitialization() { _initialize.Wait(); return _value; } public void InitializeFromAnotherThread() { _value = 13; _initialize.Set(); }
异步信号:TaskCompletionSource<T>
private readonly TaskCompletionSource<object> _initialized = new TaskCompletionSource<object>(); private int _value1; private int _value2; public async Task<int> WaitForInitializationAsync() { await _initialized.Task; return _value1 + _value2; } public void Initializd() { _value1 = 13; _value2 = 17; _initialized.TrySetResult(null); }
如果只需要发送一次信号,这种方法很合适,但是如果要打开和关闭信号,这种方法就不太合适了,此时可用Nito.AsyncEx库中的AsyncManualResetEvent,该类相当于异步的ManualResetEvent。
private readonly AsyncManualResetEvent _connected = new AsyncManualResetEvent(); public async Task WaitForConnectedAsync() { await _connected.WaitAsync(); } public void ConnectedChanged(bool connected) { if (connected) _connected.Set(); else _connected.Reset(); }
6.CountDownEvent
7.Barrier
8.ReaderWriterLockSlim
读锁是共享锁,写锁是排它锁,可先获取读锁根据当前数据再获取写锁,可最小化阻塞时间。
try { _rw.EnterUpgradeableReadLock(); if (!_items.ContainsKey(newkey)) { try { _rw.EnterWriteLock(); _items[newkey] = 1; } finally { _rw.ExitWriteLock(); } } } finally { _rw.ExitUpgradeableReadLock(); }
9.SpinWait
自旋一定次数后,进入阻塞态
10.限流
限制并发性,避免占用太多内存。PLINQ、Parallel、异步方法都有相应的限流
async Task<string[]> DownloadUrlsAsync(IEnumerable<string> urls) { var httpClient = new HttpClient(); var semaphore = new SemaphoreSlim(10); var tasks = urls.Select(async url => { await semaphore.WaitAsync(); try { return await httpClient.GetStringAsync(url); } finally { semaphore.Release(); } }).ToArray(); return await Task.WhenAll(tasks); }
以上是关于线程同步的主要内容,如果未能解决你的问题,请参考以下文章