线程同步

Posted guo_jun2

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程同步相关的知识,希望对你有一定的参考价值。

同步主要有两种:通信和数据保护

一、数据同步的充分条件:多段代码、共享数据、修改数据

1.多段代码正在并发运行

2.这几段代码在访问(读或写)同一个数据

3.至少有一段代码在修改(写)数据

 案例1.

async Task<int> ModifyValueConcurrentlyAsync()
{
    var data = new SharedData();
    var task1 = ModifyValueAsync(data);
    var task2 = ModifyValueAsync(data);
    var task3 = ModifyValueAsync(data);
    await Task.WhenAll(task1, task2, task3);
    return data.Value;
}
async Task ModifyValueAsync(SharedData data)
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    data.Value = data.Value + 1;
}
class SharedData
{
    public int Value { get; set; }
}

ModifyValueConcurrentlyAsync如果是在GUI或ASP.NET上下文中调用的(或同一时间内只允许一段代码运行的任何其他上下文),就不需要同步。而从线程池调用就需要同步。

假如把数据改为私有成员作为共享数据,需要同步吗?同样从线程池调用需要同步,并且即使在“同一时间只运行一段代码”上下文中也需要同步,因为ModifyValueConcurrentlyAsync可能被多次调用,每个独立的调用都共享这个value,此时如果要避免这种共享就要同步,此时的同步其实是对异步方法的一种限流。

案例2.

void IndependentParallelism(IEnumerable<int> values)
{
  Parallel.ForEach(values, item => Trace.WriteLine(item)); 
}

Parallel把数据分配给各个线程,没有共享数据不需要同步

int result = 0;
Parallel.ForEach(
    source: values,
    localInit: () => 0,
    body: (item, state, localvalue) => localvalue + item,
    localFinally: localvalue => { result += localvalue; }
);

此时localFinally中多个线程同时访问并修改result,因此需要同步

案例3.

var stack = ImmutableStack<int>.Empty;
var task1 = Task.Run(() => Trace.WriteLine(stack.Push(3).Peek()));
var task2 = Task.Run(() => Trace.WriteLine(stack.Push(5).Peek()));
await Task.WhenAll(task1, task2, task3);
return stack.IsEmpty;

总是返回true,不需要同步。但是不可变集合有一个共享的根变量,它本身不是不可变的,下面代码中每个线程向栈压入一个值,然后修改这个变量,此时就需要同步。

var task1 = Task.Run(() => { stack = stack.Push(3); });
var task2 = Task.Run(() => { stack = stack.Push(5); });

线程安全集合本身是同步的,所以不需要同步

var dictionary = new ConcurrentDictionary<int, int>();
var task1 = Task.Run(() => { dictionary.TryAdd(2, 3); });
var task2 = Task.Run(() => { dictionary.TryAdd(3, 5); });

二、同步方式

信号是一种通用的线程通知机制,如果只是用来协调对共享数据的访问,就用锁;如果是一个线程间发送小块数据,可用生产者/消费者队列;其他情况用信号。

0.锁

  • 限制锁的作用范围:定义一个专用的成员,不要暴露给非本类的方法,不要lock(this)、Type或string类型实例
  • 明确锁保护的内容
  • 锁范围内的代码尽量少:不要做任何阻塞操作
  • 不运行随意的代码,包括引发事件、调用委托、调用虚拟方法

0.1、阻塞锁:lock、Monitor、SpinLock、ReaderWriterLockSlim

可能导致死锁,在访问关键资源时考虑用Monitor.TryEnter()来实现,可包装到泛型类中以便复用:

public sealed class LockHolder<T> : IDisposable where T:class
{
    private T handle;
    private bool holdsLock;
    public LockHolder(T handle, int milliSecondTimeOut)
    {
        this.handle = handle;
        holdsLock = Monitor.TryEnter(handle, milliSecondTimeOut);
    }
    public bool LockSuccessful
    {
        get { return holdsLock; }
    }
    public void Dispose()
    {
        if (holdsLock)
            Monitor.Exit(handle);
        holdsLock = false;
    }
}
object lockHandle = new object();
using (LockHolder<object> lockObj = new LockHolder<object>(lockHandle, 1000))
{
    if(lockObj.LockSuccessful)
    {
        //Work elided
    }
}

 

0.2、异步锁(兼容async)

private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
private int _value;
public async Task DelayAndIncrementAsync()
{
    await _mutex.WaitAsync();
    try
    {
        var oldValue = _value;
        await Task.Delay(TimeSpan.FromSeconds(oldValue));
        _value = oldValue + 1;
    }
    finally
    {
        _mutex.Release();
    }
}

1.原子操作

2.Mutex

using (var m = new Mutex(false, MutexName))
{
    if(!m.WaitOne(TimeSpan.FromSeconds(5), false))
    {
        //Second instance is running
    }
    else
    {
        //dowork
        m.ReleaseMutex();
    }
}

3.SemaphoreSlim

4.AutoResetEvent

5.ManualResetEventSlim

private readonly ManualResetEventSlim _initialize = new ManualResetEventSlim();
private int _value;
public int WaitForInitialization()
{
    _initialize.Wait();
    return _value;
}
public void InitializeFromAnotherThread()
{
    _value = 13;
    _initialize.Set();
}

异步信号:TaskCompletionSource<T>

private readonly TaskCompletionSource<object> _initialized = new TaskCompletionSource<object>();
private int _value1;
private int _value2;

public async Task<int> WaitForInitializationAsync()
{
    await _initialized.Task;
    return _value1 + _value2;
}
public void Initializd()
{
    _value1 = 13;
    _value2 = 17;
    _initialized.TrySetResult(null);
}

如果只需要发送一次信号,这种方法很合适,但是如果要打开和关闭信号,这种方法就不太合适了,此时可用Nito.AsyncEx库中的AsyncManualResetEvent,该类相当于异步的ManualResetEvent。

private readonly AsyncManualResetEvent _connected = new AsyncManualResetEvent();
public async Task WaitForConnectedAsync()
{
    await _connected.WaitAsync();
}
public void ConnectedChanged(bool connected)
{
    if (connected)
        _connected.Set();
    else
        _connected.Reset();
}

6.CountDownEvent

7.Barrier

8.ReaderWriterLockSlim

  读锁是共享锁,写锁是排它锁,可先获取读锁根据当前数据再获取写锁,可最小化阻塞时间。

try
{
    _rw.EnterUpgradeableReadLock();
    if (!_items.ContainsKey(newkey))
    {
        try
        {
            _rw.EnterWriteLock();
            _items[newkey] = 1;
        }
        finally
        {
            _rw.ExitWriteLock();
        }
    }
}
finally
{
    _rw.ExitUpgradeableReadLock();
}

9.SpinWait

  自旋一定次数后,进入阻塞态

10.限流

  限制并发性,避免占用太多内存。PLINQ、Parallel、异步方法都有相应的限流

async Task<string[]> DownloadUrlsAsync(IEnumerable<string> urls)
{
    var httpClient = new HttpClient();
    var semaphore = new SemaphoreSlim(10);
    var tasks = urls.Select(async url =>
        {
            await semaphore.WaitAsync();
            try
            {
                return await httpClient.GetStringAsync(url);
            }
            finally
            {
                semaphore.Release();
            }
        }).ToArray();
    return await Task.WhenAll(tasks);
}

 

以上是关于线程同步的主要内容,如果未能解决你的问题,请参考以下文章

起底多线程同步锁(iOS)

多线程编程

第十次总结 线程的异步和同步

详解C++多线程

进程线程同步异步

配置 kafka 同步刷盘