如何发布到 BufferBlock 并从 ActionBlock 中获取结果?
Posted
技术标签:
【中文标题】如何发布到 BufferBlock 并从 ActionBlock 中获取结果?【英文标题】:How to Post to a BufferBlock and get a result from the ActionBlock? 【发布时间】:2021-06-22 11:59:05 【问题描述】:有一个对象一次只能处理一个请求,处理它需要一点时间。任务完成后,它会引发一个返回结果的事件。下面代码中的对象是Computer
,假设我不能改变这个类的行为。
现在,我想创建一个包装类,让客户觉得他们可以随时发送请求。该请求现在是一个异步方法,因此客户端可以简单地等待,直到返回结果。当然,底层对象一次只能处理一个请求,所以包装器需要对请求进行排队,当“处理完成”事件到来时,需要将结果返回给相应的客户端。这个包装类在下面的代码中是SharedComputer
。
我想我需要在“Place2”返回从“Place1”获得的值。推荐的做法是什么? BufferBlock/ActionBlock 没有返回值的机制吗?
static void Main(string[] args)
SharedComputer pc = new SharedComputer();
for(int i =0; i<10; i++)
Task.Factory.StartNew(async() =>
var r = new Random();
int randomDelay = r.Next(500, 5000);
Thread.Sleep(randomDelay);
int random1 = r.Next(0, 10);
int random2 = r.Next(0, 10);
int sum = await pc.Add(random1, random2);
if(random1 + random2 == sum)
Debug.WriteLine($"Got correct answer: random1 + random2 = sum.");
else
Debug.WriteLine($"Got incorrect answer: random1 + random2 = sum.");
);
System.Console.Read();
class SharedComputer
Computer Mainframe= Computer.GetInstance();
BufferBlock<TwoNumbers> JobQueue = new BufferBlock<TwoNumbers>();
TaskCompletionSource<int> TCS;
public SharedComputer()
Mainframe.Calculated += Mainframe_Calculated;
var options = new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 1
;
var jobProcessor = new ActionBlock<TwoNumbers>(async e =>
Debug.WriteLine("Starting an adding");
TCS = new TaskCompletionSource<int>();
Mainframe.StartAdding(e.A, e.B);
var res = await TCS.Task; // Place1
Debug.WriteLine("Got the result.");
, options);
JobQueue.LinkTo(jobProcessor);
private void Mainframe_Calculated(object sender, int e)
TCS.SetResult(e);
public async Task<int> Add(int a, int b)
var data = new TwoNumbers()
A = a,
B = b
;
Debug.WriteLine("Queuing a new adding.");
JobQueue.Post<TwoNumbers>(data);
return 1;//Place2: Return the value received at Place1.
struct TwoNumbers
public int A;
public int B;
class Computer
static Computer Instance;
bool IsWorking = false;
private Computer()
public static Computer GetInstance()
if (Instance == null)
Instance = new Computer();
return Instance;
public event EventHandler<int> Calculated;
public void StartAdding(int a, int b)
if (IsWorking)
throw new InvalidOperationException("Already working.");
IsWorking = true;
Task.Factory.StartNew(() =>
Thread.Sleep(3000);
IsWorking = false;
Calculated(this, a + b);
);
【问题讨论】:
附带说明一下,您的代码会快速连续创建 10 个Random
实例,这使得其中一些实例可能会使用相同的种子进行播种。 BufferBlock
也可能是多余的。 ActionBlock
有自己的内部输入队列。您可以看到here 一种将工作发送到ActionBlock
的惯用方式,并在工作完成时收到通知(它使用嵌套任务而不是TaskCompletionSource
s)。
@TheodorZoulias 谢谢。关于冗余,您可能是对的。我看到的示例使用了 BufferBlock,所以我就这样使用它,但 ActionBlock 似乎也有 Post()。而 ActionBlockTaskCompletionSource
s 来实现相同的目的,但代码会更冗长。您需要将TwoNumbers
和 TaskCompletionSource
发布到ActionBlock
,从而需要包装器类型,或使用tuples 作为包装器。
【参考方案1】:
底层对象一次只能处理一个请求,因此包装器需要对请求进行排队,当“处理完成”事件到达时,需要将结果返回给相应的客户端。
所以你需要的是互斥。虽然您可以从 TPL Dataflow 和 TaskCompletionSource<T>
构建互斥锁,但使用内置的 SemaphoreSlim
会容易得多。
IMO 首先编写异步抽象,然后添加互斥会更简洁。异步抽象would look like:
public static class ComputerExtensions
public static Task<int> AddAsync(this Computer computer, int a, int b)
var tcs = new TaskCompletionSource<int>();
EventHandler<int> handler = null;
handler = (_, result) =>
computer.Calculated -= handler;
tcs.TrySetResult(result);
;
computer.Calculated += handler;
computer.StartAdding(a, b);
拥有异步 API 后,您可以通过 SemaphoreSlim
轻松应用异步限制(或互斥):
class SharedComputer
Computer Mainframe = Computer.GetInstance();
readonly SemaphoreSlim _mutex = new();
public async Task<int> AddAsync(int a, int b)
await _mutex.WaitAsync();
try return Mainframe.AddAsync(a, b);
finally _mutex.Release();
顺便说一句,use Task.Run
instead of Task.Factory.StartNew
。
【讨论】:
以上是关于如何发布到 BufferBlock 并从 ActionBlock 中获取结果?的主要内容,如果未能解决你的问题,请参考以下文章
BufferBlock<T> -> ActionBlock<T> 未触发
TPL Dataflow BufferBlock 线程安全吗?