BufferBlock 缺失值
Posted
技术标签:
【中文标题】BufferBlock 缺失值【英文标题】:BufferBlock missing values 【发布时间】:2021-10-18 09:08:36 【问题描述】:当发送消息之间的时间间隔很短时,来自Dataflow
库的BufferBlock
似乎丢失了值。代码如下:
private async static Task Main(string[] args)
await foreach (var x in Exec())
Console.WriteLine(x);
public static async IAsyncEnumerable<int> Exec()
BufferBlock<int> buffer = new BufferBlock<int>();
const int commandTime = 10;
var tasks = Enumerable
.Range(0, 10)
.Select(e =>
Task.Run(async () =>
var x = new Random().Next(0, commandTime);
await Task.Delay(x);
while (!await buffer.SendAsync(x)) ;
));
var t = Task.WhenAll(tasks);
while (!t.IsCompleted)
yield return await buffer.ReceiveAsync();
await Task.Delay(x)
代表对外部服务的调用。当我将commandTime
设置为 10 时,我只会得到一个结果(有时更多),但是当我延长命令的可能执行时间(例如 1000)时,我会得到全部 10 个结果。有人可以解释一下为什么 BufferBlock
不消耗价值观?
【问题讨论】:
您是否尝试将缓冲区块链接到执行 WriteLine 的操作块? 我不是 100% 确定,但我怀疑你的 while 条件是罪魁祸首。 @Fildor 没有while
结果是一样的。我是因为方法的描述才这么做的
@Fildor 主要问题是......为什么服务响应时间和将其发送到缓冲区之间存在相关性。
有些相关:How to implement an efficient WhenEach that streams an IAsyncEnumerable of task results?
【参考方案1】:
SendAsync 和 ReceiveAsync 本身等待结果(或者可以发送结果)。不需要循环。
我已在您的代码中添加了一些输出以使此行为更清晰。为了更好地演示,缓冲区块可以容纳 3 个元素。
public static class Program
public static async Task Main()
await foreach (var x in Exec())
Console.WriteLine(x);
Console.WriteLine("READY");
public static async IAsyncEnumerable<int> Exec()
// We define a buffer block that can hold 3 elements. The fourth element will have to wait.
BufferBlock<int> buffer = new BufferBlock<int>(new DataflowBlockOptions BoundedCapacity = 3 );
const int taskCount = 10;
var tasks = Enumerable
.Range(0, 10)
.Select((e, number) =>
Task.Run(async () =>
Console.WriteLine($"Sending number...");
// From documentation: If the target
// postpones the offered element, the element will be buffered until such time that
// the target consumes or releases it, at which point the task will complete, with
// its System.Threading.Tasks.Task`1.Result indicating whether the message was consumed.
// So we don't need a waiting loop.
var result = await buffer.SendAsync(number);
Console.WriteLine($"Result of SendAsync(number) is result.");
))
// Now our LINQ statement will be executed and 10 Tasks will write into the buffer.
.ToList();
// Our writing tasks are running and we receive the results as soon as they are available.
Console.WriteLine("Waiting for receive.");
for (int i = 0; i < taskCount; i++)
// To demonstrate that SendAsync will wait.
await Task.Delay(2000);
yield return await buffer.ReceiveAsync();
信号完成
接收循环知道消息的数量。如果接收方不知道这一点,发送方可以向 BufferBlock 发出完成信号:
public static async IAsyncEnumerable<int> Exec()
BufferBlock<int> buffer = new BufferBlock<int>(new DataflowBlockOptions BoundedCapacity = 3 );
const int taskCount = 10;
int sentNumbers = 0;
var tasks = Enumerable
.Range(0, taskCount)
.Select((e, number) =>
Task.Run(async () =>
var result = await buffer.SendAsync(number);
// We have sent the last number. Now we signal to the BufferBlock that it should
// not accept nor produce any more messages nor consume any more postponed messages.
if (Interlocked.Add(ref sentNumbers, 1) == taskCount)
buffer.Complete();
))
.ToList();
while (await buffer.OutputAvailableAsync())
yield return await buffer.ReceiveAsync();
【讨论】:
嗨 Micheal,这个await Task.Delay(2000)
在我的代码中放置在正确的位置,因为我希望该方法返回一些我将放置在缓冲区中的值
你可以在你的发送任务中写一个延迟(或任何你想要的地方),程序仍然可以工作。
非常好的解决方案 :) 我喜欢它!!!这也解决了我的代码问题。关于服务响应时间和缺失值(在我的代码中)之间的相关性的最初问题呢?
原始代码中有一个无限循环(如果没有完成发送到缓冲区,SendAsync 总是返回 true)。这些任务永远运行并以非常高的速度将数据发送到缓冲区。这可能会导致各种问题。
while (!t.IsCompleted)
是主要原因(我的错误,while (!await buffer.SendAsync(x))
不是无限循环,因为您否定了条件)。 t
是等待任务(WhenAll
的返回值。如果你的任务很快,所有发送任务在这条语句之前完成。所以循环执行一次。如果你的发送任务花费更多时间,你会捕获更多(但不是全部)在你的循环中。【参考方案2】:
你的代码有几个问题:
while (!await buffer.SendAsync(x))
循环正在自找麻烦。如果SendAsync
发送消息一次失败,它将永远失败。只有当目标块以DecliningPermanently
响应OfferMessage
时,SendAsync
才会发送消息失败。这发生在目标块完成时,无论是手动完成还是由于异常,它不再接受任何消息。在您的情况下,SendAsync
将始终成功,所以这不是问题,但在其他情况下,它可以很容易地将您的代码放入无限循环。
快速连续创建多个Random
实例可能会导致所有实例都使用相同的种子进行初始化,从而产生相同的随机数字序列。 AFAIK 这是 .NET Framework 的问题,而不是 .NET Core / .NET 5 的问题。恕我直言,如果您使用单个 Random
实例并同步访问它会更安全(因为 Random
类 @987654323 @)。
while (!t.IsCompleted)
循环引入了竞争条件。该任务可能在buffer
仍然包含消息的时刻完成。您可以尝试像这样修复它:while (!t.IsCompleted && buffer.Count > 0)
,但这只会将一种竞争条件换成另一种。这些属性不用于控制执行流程。正确的方法是使用信号机制,特别是OutputAvailableAsync
方法,如图here。
手动创建 10 个任务并使用 await Task.WhenAll
等待它们首先违背了使用 TPL 数据流的目的。这个库包含强大的组件,可以更轻松地完成相同的事情,具有更多选项,并且在异常情况下具有更好的行为。比如TransformBlock
。下面是我将如何重构您的代码,以利用该组件的强大功能:
public static async IAsyncEnumerable<int> Exec()
const int commandTime = 10;
var random = new Random();
var block = new TransformBlock<object, int>(async _ =>
int x; lock (random) x = random.Next(0, commandTime);
await Task.Delay(x);
return x;
, new ExecutionDataflowBlockOptions()
EnsureOrdered = false,
MaxDegreeOfParallelism = 5, // Optional
);
// Feeder
_ = Task.Run(async () =>
try
foreach (var _ in Enumerable.Range(0, 10))
bool accepted = await block.SendAsync(null);
if (!accepted) break; // The block has failed
block.Complete();
catch (Exception ex)
((IDataflowBlock)block).Fault(ex);
);
// Emit the results as they become available
while (await block.OutputAvailableAsync())
while (block.TryReceive(out var item))
yield return item;
await block.Completion; // Propagate possible exception
在这种情况下,发送到TransformBlock
的输入消息无关紧要,因此我将TInput
声明为object
,并将null
s 作为消息传递。
// Feeder
任务演示了如何在不干扰结果生成循环的单独异步工作流中向TransformBlock
提供消息。对于这个具体的例子来说,这并不是真正需要的,一个简单的foreach (var _ in Enumerable.Range(0, 10)) block.Post(null);
就足够了。
您也可以将馈送器实现为async void
方法,而不是一劳永逸的任务_ = Task.Run
。在实践中它不会产生任何影响,但理论上 async void
是更负责任的选择,因为它会传播任何未处理的异常而不是吞下它。
【讨论】:
感谢您的回复。您指出的竞争条件将我引向了解决方案。我错过了对缓冲区计数的检查。添加此检查 (while(!t.IsCompleted || buffer.Count !=0)
) 后,一切都按预期工作。此外,我将添加您发布的变压器。非常感谢您的解决方案
阻止我使用大部分代码的一个主要障碍是我的代码中事先不知道任务的数量。它们是从其他服务提供的,因此Complete
ing 缓冲区的意义是未知的。
@Pawel 如果事先不知道任务的数量,那么您如何知道示例中的taskCount
?我认为Task.WhenAll
比使用ActionBlock
或TransformBlock
更难处理未知数量的任务。
@Pawel TransformBlock
也会在结果生成后立即传播,前提是它配置了EnsureOrdered = false
,如我的示例所示。我在答案的second revision 中添加了它,所以您可能错过了它。
同意。完成缓冲区的时间呢?在你看来这可以实现吗? (当任务数量未知时)以上是关于BufferBlock 缺失值的主要内容,如果未能解决你的问题,请参考以下文章