TPL Dataflow BufferBlock 线程安全吗?
Posted
技术标签:
【中文标题】TPL Dataflow BufferBlock 线程安全吗?【英文标题】:Is TPL Dataflow BufferBlock thread safe? 【发布时间】:2018-11-27 10:31:40 【问题描述】:我有一个相当简单的生产者-消费者模式,其中(简化)我有两个生产者,他们生产的输出将由一个消费者消费。
为此,我使用System.Threading.Tasks.Dataflow.BufferBlock<T>
创建了一个BufferBlock
对象。一个Consumer
正在监听这个BufferBlock
,并处理任何接收到的输入。
同时有两个 'Producerssend data to the
BufferBlock`
简化:
BufferBlock<int> bufferBlock = new BufferBlock<int>();
async Task Consume()
while(await bufferBlock.OutputAvailable())
int dataToProcess = await outputAvailable.ReceiveAsync();
Process(dataToProcess);
async Task Produce1()
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
async Task Produce2()
IEnumerable<int> numbersToProcess = ...;
foreach (int numberToProcess in numbersToProcess)
await bufferBlock.SendAsync(numberToProcess);
// ignore result for this example
我想先启动消费者,然后将生产者作为单独的任务启动:
var taskConsumer = Consume(); // do not await yet
var taskProduce1 = Task.Run( () => Produce1());
var taskProduce2 = Task.Run( () => Produce2());
// await until both producers are finished:
await Task.WhenAll(new Task[] taskProduce1, taskProduce2);
bufferBlock.Complete(); // signal that no more data is expected in bufferBlock
// await for the Consumer to finish:
await taskConsumer;
乍一看,这正是生产者-消费者的含义:多个生产者生产数据,而消费者正在消费产生的数据。
然而,BufferBlock about thread safety 说:
不保证任何实例成员都是线程安全的。
我认为 TPL 中的 P 表示并行! 我应该担心吗?我的代码不是线程安全的吗? 我应该使用不同的 TPL 数据流类吗?
【问题讨论】:
MSDN 文章中的“线程安全”部分是出了名的不可靠。只是从文章模板中复制/粘贴,作者往往无法从开发人员那里获得足够的信息以使其更准确。请记住,仅仅因为 DataBlock 是线程安全的,并不会自动使您自己的代码也成为线程安全的。当另一个线程忙于添加项目时,像 Count 属性这样的东西是毫无价值的。希望显而易见,但未在文档中明确说明。 为什么是BufferBlock
而不是ActionBlock
? async Task Consume()
可以只替换为 Process(...)
调用,例如ActionBLock<int>(x => Process(x))
【参考方案1】:
是的,BufferBlock
类是线程安全的。我无法通过指向官方文档来支持这一说法,因为“线程安全”部分已从文档中删除。但是我可以在源代码中看到该类包含一个用于同步传入消息的锁定对象:
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock get return _source;
当调用Post
扩展方法(source code)时,显式实现的ITargetBlock.OfferMessage
方法被调用(source code)。以下是该方法的摘录:
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept)
//...
lock (IncomingLock)
//...
_source.AddMessage(messageValue);
//...
如果这个类或包含在TPL Dataflow 库中的任何其他XxxBlock
类不是线程安全的,那确实会很奇怪。这会严重影响这个伟大库的易用性。
【讨论】:
【参考方案2】:我认为ActionBlock<T>
更适合您的工作,因为它有一个内置缓冲区,许多生产者可以通过该缓冲区发送数据。默认块选项处理单个后台任务上的数据,但您可以为并行度和有限容量设置新值。对于ActionBlock<T>
,确保线程安全的主要关注领域将在您传递的处理每条消息的委托中。该函数的操作必须独立于每条消息,即不能像任何Parrallel...
函数那样修改共享状态。
public class ProducerConsumer
private ActionBlock<int> Consumer get;
public ProducerConsumer()
Consumer = new ActionBlock<int>(x => Process(x));
public async Task Start()
var producer1Tasks = Producer1();
var producer2Tasks = Producer2();
await Task.WhenAll(producer1Tasks.Concat(producer2Tasks));
Consumer.Complete();
await Consumer.Completion;
private void Process(int data)
// process
private IEnumerable<Task> Producer1() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
private IEnumerable<Task> Producer2() => Enumerable.Range(0, 100).Select(x => Consumer.SendAsync(x));
【讨论】:
我关心的是 ActionBlock.SendAsync 函数的线程安全:两个不同的线程在同一个 ActionBlock 对象上调用 ActionBlock.SendAsync。 ActionBlock 类提到了与 BufferBlock 相同的关于线程安全的评论:SendAsync 不是线程安全的。是什么让您认为 ActionBlock 比 BufferBlock 更线程安全? 这句话是一个通用警告,只有静态方法才能保证是线程安全的。尽管如此,如果两个线程在同一个Acton/Buffer Block
上调用SendAsync
,那么两条消息将排队等待插入块输入缓冲区。 SendAsync
本身对于多个生产者来说是线程安全的,只要将块选项 SingleProducerConstrained 设置为 false
,这是默认行为。以上是关于TPL Dataflow BufferBlock 线程安全吗?的主要内容,如果未能解决你的问题,请参考以下文章