用 TPL 数据流中的最新值替换缓冲值
Posted
技术标签:
【中文标题】用 TPL 数据流中的最新值替换缓冲值【英文标题】:Replace buffered value with latest in TPL Dataflow 【发布时间】:2021-09-21 07:30:29 【问题描述】:我需要帮助以使 TPL 数据流管道更新具有最新值的输入缓冲区。
我订阅了元素的实时流,这些元素被一一发布到数据流管道上。每个元素都被处理,这需要一些时间 - 处理一个元素所花费的时间比生成它所花费的时间要多得多(即快速生产者,慢消费者)。
但是,如果输入队列中有多个具有相同标识的元素,则只有最近的一个需要处理。中间的可以丢弃。这是我无法弄清楚的部分。
这是我想要实现的一个示例:
public record Bid(int Id, int Value);
async Task Main()
// This block is just here to log that an input is received.
var startBlock = new TransformBlock<Bid, Bid>(d =>
Console.WriteLine("Input: 0 (1)", d.Id, d.Value);
return d;
);
//TODO: Check for duplicate identity (Bid.Id) and replace the
// current element with the most recent one.
var updateWithMostRecentBlock = new TransformBlock<Bid, Bid>(d => d);
var processBlock = new TransformBlock<Bid, Bid>(async d =>
Console.WriteLine("Processing: 0 (1)", d.Id, d.Value);
await Task.Delay(1000);
return d;
);
var finishBlock = new ActionBlock<Bid>(d =>
Console.WriteLine("Done: 0 (1)", d.Id, d.Value);
);
var propagateCompletion = new DataflowLinkOptions PropagateCompletion = true ;
startBlock.LinkTo(updateWithMostRecentBlock, propagateCompletion);
updateWithMostRecentBlock.LinkTo(processBlock, propagateCompletion);
processBlock.LinkTo(finishBlock, propagateCompletion);
var data = new[]
new Bid(1, 0), // Processed immediately
new Bid(1, 1), // Replaced with (1,2)
new Bid(2, 0), // Replaced with (2,1)
new Bid(1, 2), // Queued
new Bid(2, 1) // Queued
;
foreach (var d in data)
startBlock.Post(d);
startBlock.Complete();
await finishBlock.Completion;
当processBlock
准备好接收下一个元素时,我希望updateWithMostRecentBlock
只提供最相关的元素。
实际输出:
Input: 1 (0)
Input: 1 (1)
Input: 2 (0)
Input: 1 (2)
Input: 2 (1)
Processing: 1 (0)
Processing: 1 (1)
Done: 1 (0)
Processing: 2 (0)
Done: 1 (1)
Processing: 1 (2)
Done: 2 (0)
Processing: 2 (1)
Done: 1 (2)
Done: 2 (1)
预期输出:
Input: 1 (0) // Immediately processed
Input: 1 (1) // Replaced by (1,2)
Input: 2 (0) // Replaced by (2,1)
Input: 1 (2) // Queued
Input: 2 (1) // Queued
Processing: 1 (0)
Done: 1 (0)
Processing: 1 (2)
Done: 1 (2)
Processing: 2 (1)
Done: 2 (1)
提示: Stephen Toub 的an elegant solution 与我想要实现的目标完全相反。他的解决方案拒绝所有传入的元素并保留最旧的元素。
【问题讨论】:
使用您需要的逻辑创建自己的块可能更容易。 有很多方法可以实现这一点,例如,你可以有一个基于你选择的键的同步字典,当项目被缓冲时,只需更新字典中的项目,当它出队时检查项目,使用它,删除字典项目。显然还有很多想象空间,但非常可行 我尝试使用自定义块,但无法让它工作,因为processBlock
在准备好时不会拉下一个元素 - updateWithMostRecentBlock
必须推动它。这将需要在processBlock
准备好时触发一些触发器进行缓冲,这是不可用的信息。我还没弄清楚如何访问processBlock
的输入缓冲区...
我想您将需要完全使用自定义块作为缓冲区,其目标块的有限容量为 1,以便在使用消息时在自定义块上创建适当的背压是您何时进行更换。请注意,我已经有一段时间没有构建自定义块了,但希望有人有时间为您整理这些详细信息
【参考方案1】:
很抱歉回答了我自己的问题,但@TheGeneral 用他关于有限容量的暗示让我走上了正轨。
我必须配置 processBlock
以将有界容量设置为 1:
var processBlock = new TransformBlock<Bid, Bid>(
async d =>
Console.WriteLine("Processing: 0 (1)", d.Id, d.Value);
await Task.Delay(1000);
return d;
,
new ExecutionDataflowBlockOptions
BoundedCapacity = 1
);
然后我将updateWithMostRecentBlock
替换为具有此实现的自定义块:
public class DiscardAndReplaceDuplicatesBlock<TValue, TKey> : IPropagatorBlock<TValue, TValue>
where TKey : IEquatable<TKey>
private readonly ITargetBlock<TValue> _target;
private readonly IReceivableSourceBlock<TValue> _source;
public DiscardAndReplaceDuplicatesBlock(Func<TValue, TKey> keyAccessor)
var buffer = new ConcurrentDictionary<TKey, (TValue Value, Task Task, CancellationTokenSource Token)>();
var outgoing = new BufferBlock<TValue>(new ExecutionDataflowBlockOptions
BoundedCapacity = 1,
MaxMessagesPerTask = 1
);
var incoming = new ActionBlock<TValue>(value =>
var key = keyAccessor(value);
var cts = new CancellationTokenSource();
var isQueued = buffer.TryGetValue(key, out var previous);
if (isQueued)
buffer.TryRemove(key, out var current);
Console.WriteLine("Remove: 0", current.Value);
if (!previous.Task.IsCompleted)
previous.Token.Cancel();
previous.Token.Dispose();
Console.WriteLine("Cancel: 0", current.Value);
var task = outgoing.SendAsync(value, cts.Token);
if (task.IsCompleted)
cts.Dispose();
Console.WriteLine("Sent: 0", value);
return;
buffer.AddOrUpdate(key, (value, task, cts), (k, t) => (value, task, cts));
Console.WriteLine("Buffered: 0", value);
);
incoming.Completion.ContinueWith(
async t =>
if (t.IsFaulted)
((ITargetBlock<TValue>)outgoing).Fault(t.Exception.InnerException);
else
await WaitForBufferToCompleteAsync().ConfigureAwait(false);
outgoing.Complete();
,
default,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
Task WaitForBufferToCompleteAsync()
if (!buffer.Any())
return Task.CompletedTask;
var buffered = buffer.Where(kvp => !kvp.Value.Task.IsCompleted);
var tasks = buffered.Select(b => b.Value.Task);
return Task.WhenAll(tasks);
_target = incoming;
_source = outgoing;
public Task Completion =>
_source.Completion;
public void Complete() =>
_target.Complete();
public void Fault(Exception exception) =>
_target.Fault(exception);
public IDisposable LinkTo(ITargetBlock<TValue> target, DataflowLinkOptions linkOptions) =>
_source.LinkTo(target, linkOptions);
public TValue ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TValue> target, out bool messageConsumed) =>
_source.ConsumeMessage(messageHeader, target, out messageConsumed);
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TValue messageValue, ISourceBlock<TValue>? source, bool consumeToAccept) =>
_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TValue> target) =>
_source.ReserveMessage(messageHeader, target);
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TValue> target) =>
_source.ReleaseReservation(messageHeader, target);
它不是很漂亮,也没有经过生产测试,但它似乎可以工作。为了实际替换一个已经分派的元素,我必须保留使用的取消标记,这样我就可以取消一个过时但未处理的元素。我不确定这是不是最好的主意,所以欢迎任何批评!
但请注意:这也将处理元素 (1,1)
,因为在将 (1,0)
分派到 processorBlock
之后,元素 (1,1)
已成功发送到自定义块的输出缓冲区。我认为这是无法避免的。
【讨论】:
无需抱歉。欢迎在这里and actually encouraged 回答我们自己的问题! ?【参考方案2】:这是我对这个问题的看法:
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockDropOldestByKey<TInput, TOutput, TKey>(
Func<TInput, Task<TOutput>> transform,
Func<TInput, TKey> keySelector,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IEqualityComparer<TKey> keyComparer = null,
IProgress<TInput> droppedItems = null)
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;
var dictionary = new Dictionary<TKey, TInput>(keyComparer);
var outputBlock = new TransformManyBlock<TKey, TOutput>(async key =>
bool removed; TInput removedItem;
lock (dictionary) removed = dictionary.Remove(key, out removedItem);
if (!removed) return Enumerable.Empty<TOutput>();
return new[] await transform(removedItem).ConfigureAwait(false) ;
, dataflowBlockOptions);
var inputBlock = new ActionBlock<TInput>(item =>
var key = keySelector(item);
bool dropped; TInput droppedItem;
lock (dictionary)
dropped = dictionary.TryGetValue(key, out droppedItem);
dictionary[key] = item;
if (dropped) droppedItems?.Report(droppedItem);
return outputBlock.SendAsync(key);
, new ExecutionDataflowBlockOptions()
BoundedCapacity = 1,
CancellationToken = dataflowBlockOptions.CancellationToken,
TaskScheduler = dataflowBlockOptions.TaskScheduler,
);
PropagateCompletion(inputBlock, outputBlock);
PropagateFailure(outputBlock, inputBlock);
return DataflowBlock.Encapsulate(inputBlock, outputBlock);
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
try await source.Completion.ConfigureAwait(false); catch
var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
try await source.Completion.ConfigureAwait(false); catch
if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
使用示例:
var droppedItems = new Progress<Bid>(b =>
Console.WriteLine($"Dropped: b.Id (b.Value)");
);
var processBlock = CreateTransformBlockDropOldestByKey<Bid, Bid, int>(async b =>
Console.WriteLine($"Processing: b.Id (b.Value)");
await Task.Delay(1000);
return b;
, b => b.Id, droppedItems: droppedItems);
inputBlock
和outputBlock
这两个内部块没有直接链接在一起的原因是,否则outputBlock
中的故障可能会使inputBlock
永远处于未完成状态.重要的是,如果两个块之一失败,另一个也应该失败,以便取消对 inputBlock
的任何挂起的 SendAsync
操作。这些块通过使用PropagateCompletion
和PropagateFailure
方法间接链接在一起。
使用BoundedCapacity
配置processBlock
应考虑到块可能在其输入队列中包含可能已被丢弃的键,因此建议将此配置设置为稍高的值。
【讨论】:
以上是关于用 TPL 数据流中的最新值替换缓冲值的主要内容,如果未能解决你的问题,请参考以下文章