如何阻塞直到异步作业完成
Posted
技术标签:
【中文标题】如何阻塞直到异步作业完成【英文标题】:How to block until an asynchronous job finishes 【发布时间】:2010-09-20 04:09:44 【问题描述】:我正在开发一个 C# 库,该库使用 NVIDIA 的 CUDA 将某些工作任务卸载到 GPU。一个例子是使用扩展方法将两个数组相加:
float[] a = new float[] ...
float[] b = new float[] ...
float[] c = a.Add(b);
这段代码中的工作是在 GPU 上完成的。但是,我希望它异步完成,这样只有在需要结果时才会在 CPU 块上运行代码(如果结果尚未在 GPU 上完成)。为此,我创建了一个隐藏异步执行的 ExecutionResult 类。在使用中如下所示:
float[] a = new float[] ...
float[] b = new float[] ...
ExecutionResult res = a.Add(b);
float[] c = res; //Implicit converter
如果数据还没有准备好,程序会在最后一行阻塞。我不确定在 ExecutionResult 类中实现这种阻塞行为的最佳方法,因为我对同步线程和这类事情不是很有经验。
public class ExecutionResult<T>
private T[] result;
private long computed = 0;
internal ExecutionResult(T[] a, T[] b, Action<T[], T[], Action<T[]>> f)
f(a, b, UpdateData); //Asych call - 'UpdateData' is the callback method
internal void UpdateData(T[] data)
if (Interlocked.Read(ref computed) == 0)
result = data;
Interlocked.Exchange(ref computed, 1);
public static implicit operator T[](ExecutionResult<T> r)
//This is obviously a stupid way to do it
while (Interlocked.Read(ref r.computed) == 0)
Thread.Sleep(1);
return result;
传递给构造函数的 Action 是一个异步方法,它在 GPU 上执行实际工作。嵌套的Action是异步回调方法。
我主要关心的是如何最好/最优雅地处理转换器中完成的等待,以及是否有更合适的方法来解决整个问题。如果有什么我需要详细说明或进一步解释的,请发表评论。
【问题讨论】:
【参考方案1】:我不清楚你正在实现多少框架以及调用了多少其他代码,但我会尽可能遵循 .NET 中的"normal" async pattern。
【讨论】:
【参考方案2】:我发现该问题的解决方案是将一个函数传递给执行两件事的 ExecutionResult 构造函数。运行时,它开始异步工作,此外,它还返回另一个返回所需结果的函数:
private Func<T[]> getResult;
internal ExecutionResult(T[] a, T[] b, Func<T[], T[], Func<T[]>> asynchBinaryFunction)
getResult = asynchUnaryFunction(a);
public static implicit operator T[](ExecutionResult<T> r)
return r.getResult();
“getResult”函数会阻塞,直到数据被计算并从 GPU 中提取。这适用于 CUDA 驱动程序 API 的结构。
这是一个非常干净和简单的解决方案。由于 C# 允许通过访问本地范围来创建匿名函数,因此只需替换传递给 ExecutionResult 构造函数的方法的阻塞部分,这样......
...
status = LaunchGrid(func, length);
//Fetch result
float[] c = new float[length];
status = CUDADriver.cuMemcpyDtoH(c, ptrA, byteSize);
status = Free(ptrA, ptrB);
return c;
变成……
...
status = LaunchGrid(func, length);
return delegate
float[] c = new float[length];
CUDADriver.cuMemcpyDtoH(c, ptrA, byteSize); //Blocks until work is done
Free(ptrA, ptrB);
return c;
;
【讨论】:
【参考方案3】:我想知道您是否不能在这里使用常规的Delegate.BeginInvoke
/Delegate.EndInvoke
?如果不是,则可以选择等待句柄(例如 ManualResetEvent
):
using System.Threading;
static class Program
static void Main()
ThreadPool.QueueUserWorkItem(DoWork);
System.Console.WriteLine("Main: waiting");
wait.WaitOne();
System.Console.WriteLine("Main: done");
static void DoWork(object state)
System.Console.WriteLine("DoWork: working");
Thread.Sleep(5000); // simulate work
System.Console.WriteLine("DoWork: done");
wait.Set();
static readonly ManualResetEvent wait = new ManualResetEvent(false);
请注意,如果你真的想要,你可以只使用对象来做到这一点:
using System.Threading;
static class Program
static void Main()
object syncObj = new object();
lock (syncObj)
ThreadPool.QueueUserWorkItem(DoWork, syncObj);
System.Console.WriteLine("Main: waiting");
Monitor.Wait(syncObj);
System.Console.WriteLine("Main: done");
static void DoWork(object syncObj)
System.Console.WriteLine("DoWork: working");
Thread.Sleep(5000); // simulate work
System.Console.WriteLine("DoWork: done");
lock (syncObj)
Monitor.Pulse(syncObj);
【讨论】:
【参考方案4】:使用 cudaThreadSyncronize() 或 memcpy() 您可以执行同步操作 - 适用于 Invoke()。 CUDA 还允许您使用 callAsync() / sync() 请求异步内存传输 - 适用于使用 callAsync() 的 Begin/EndInvoke()。
【讨论】:
以上是关于如何阻塞直到异步作业完成的主要内容,如果未能解决你的问题,请参考以下文章