用 TPL 数据流中的最新值替换缓冲值

Posted

技术标签:

【中文标题】用 TPL 数据流中的最新值替换缓冲值【英文标题】:Replace buffered value with latest in TPL Dataflow 【发布时间】:2021-09-21 07:30:29 【问题描述】:

我需要帮助以使 TPL 数据流管道更新具有最新值的输入缓冲区。

我订阅了元素的实时流,这些元素被一一发布到数据流管道上。每个元素都被处理,这需要一些时间 - 处理一个元素所花费的时间比生成它所花费的时间要多得多(即快速生产者,慢消费者)。

但是,如果输入队列中有多个具有相同标识的元素,则只有最近的一个需要处理。中间的可以丢弃。这是我无法弄清楚的部分。

这是我想要实现的一个示例:

public record Bid(int Id, int Value);

async Task Main()

    // This block is just here to log that an input is received.
    var startBlock = new TransformBlock<Bid, Bid>(d =>
        
            Console.WriteLine("Input: 0 (1)", d.Id, d.Value);
            return d;
        );

//TODO: Check for duplicate identity (Bid.Id) and replace the
// current element with the most recent one.
    var updateWithMostRecentBlock = new TransformBlock<Bid, Bid>(d => d);
    
    var processBlock = new TransformBlock<Bid, Bid>(async d =>
        
            Console.WriteLine("Processing: 0 (1)", d.Id, d.Value);
            await Task.Delay(1000);
            return d;
        );

    var finishBlock = new ActionBlock<Bid>(d =>
        
            Console.WriteLine("Done: 0 (1)", d.Id, d.Value);
        );


    var propagateCompletion = new DataflowLinkOptions  PropagateCompletion = true ;
    startBlock.LinkTo(updateWithMostRecentBlock, propagateCompletion);
    updateWithMostRecentBlock.LinkTo(processBlock, propagateCompletion);
    processBlock.LinkTo(finishBlock, propagateCompletion);


    var data = new[]
        
            new Bid(1, 0), // Processed immediately
            new Bid(1, 1), // Replaced with (1,2)
            new Bid(2, 0), // Replaced with (2,1)
            new Bid(1, 2), // Queued
            new Bid(2, 1)  // Queued
        ;
    foreach (var d in data)
        startBlock.Post(d);

    startBlock.Complete();
    await finishBlock.Completion;

processBlock 准备好接收下一个元素时,我希望updateWithMostRecentBlock 只提供最相关的元素。

实际输出:

Input: 1 (0)
Input: 1 (1)
Input: 2 (0)
Input: 1 (2)
Input: 2 (1)
Processing: 1 (0)
Processing: 1 (1)
Done: 1 (0)
Processing: 2 (0)
Done: 1 (1)
Processing: 1 (2)
Done: 2 (0)
Processing: 2 (1)
Done: 1 (2)
Done: 2 (1)

预期输出:

Input: 1 (0)       // Immediately processed
Input: 1 (1)       // Replaced by (1,2)
Input: 2 (0)       // Replaced by (2,1)
Input: 1 (2)       // Queued
Input: 2 (1)       // Queued
Processing: 1 (0)
Done: 1 (0)
Processing: 1 (2)
Done: 1 (2)
Processing: 2 (1)
Done: 2 (1)

提示: Stephen Toub 的an elegant solution 与我想要实现的目标完全相反。他的解决方案拒绝所有传入的元素并保留最旧的元素。

【问题讨论】:

使用您需要的逻辑创建自己的块可能更容易。 有很多方法可以实现这一点,例如,你可以有一个基于你选择的键的同步字典,当项目被缓冲时,只需更新字典中的项目,当它出队时检查项目,使用它,删除字典项目。显然还有很多想象空间,但非常可行 我尝试使用自定义块,但无法让它工作,因为processBlock 在准备好时不会拉下一个元素 - updateWithMostRecentBlock 必须推动它。这将需要在processBlock 准备好时触发一些触发器进行缓冲,这是不可用的信息。我还没弄清楚如何访问processBlock 的输入缓冲区... 我想您将需要完全使用自定义块作为缓冲区,其目标块的有限容量为 1,以便在使用消息时在自定义块上创建适当的背压是您何时进行更换。请注意,我已经有一段时间没有构建自定义块了,但希望有人有时间为您整理这些详细信息 【参考方案1】:

很抱歉回答了我自己的问题,但@TheGeneral 用他关于有限容量的暗示让我走上了正轨。

我必须配置 processBlock 以将有界容量设置为 1:

    var processBlock = new TransformBlock<Bid, Bid>(
        async d =>
            
                Console.WriteLine("Processing: 0 (1)", d.Id, d.Value);
                await Task.Delay(1000);
                return d;
            ,
        new ExecutionDataflowBlockOptions
            
                BoundedCapacity = 1
            );

然后我将updateWithMostRecentBlock 替换为具有此实现的自定义块:

public class DiscardAndReplaceDuplicatesBlock<TValue, TKey> : IPropagatorBlock<TValue, TValue>
    where TKey : IEquatable<TKey>

    private readonly ITargetBlock<TValue> _target;
    private readonly IReceivableSourceBlock<TValue> _source;

    public DiscardAndReplaceDuplicatesBlock(Func<TValue, TKey> keyAccessor)
    
        var buffer = new ConcurrentDictionary<TKey, (TValue Value, Task Task, CancellationTokenSource Token)>();
        var outgoing = new BufferBlock<TValue>(new ExecutionDataflowBlockOptions
            
                BoundedCapacity = 1,
                MaxMessagesPerTask = 1
            );
        var incoming = new ActionBlock<TValue>(value =>
            
                var key = keyAccessor(value);
                var cts = new CancellationTokenSource();

                var isQueued = buffer.TryGetValue(key, out var previous);
                if (isQueued)
                
                    buffer.TryRemove(key, out var current);
                    Console.WriteLine("Remove: 0", current.Value);

                    if (!previous.Task.IsCompleted)
                    
                        previous.Token.Cancel();
                        previous.Token.Dispose();
                        Console.WriteLine("Cancel: 0", current.Value);
                    
                

                var task = outgoing.SendAsync(value, cts.Token);
                if (task.IsCompleted)
                
                    cts.Dispose();
                    Console.WriteLine("Sent: 0", value);
                    return;
                

                buffer.AddOrUpdate(key, (value, task, cts), (k, t) => (value, task, cts));
                Console.WriteLine("Buffered: 0", value);
            );

        incoming.Completion.ContinueWith(
            async t =>
                
                    if (t.IsFaulted)
                    
                        ((ITargetBlock<TValue>)outgoing).Fault(t.Exception.InnerException);
                    
                    else
                    
                        await WaitForBufferToCompleteAsync().ConfigureAwait(false);
                        outgoing.Complete();
                    
                ,
            default,
            TaskContinuationOptions.ExecuteSynchronously,
            TaskScheduler.Default);

        Task WaitForBufferToCompleteAsync()
        
            if (!buffer.Any())
                return Task.CompletedTask;

            var buffered = buffer.Where(kvp => !kvp.Value.Task.IsCompleted);
            var tasks = buffered.Select(b => b.Value.Task);
            return Task.WhenAll(tasks);
        

        _target = incoming;
        _source = outgoing;
    

    public Task Completion =>
        _source.Completion;

    public void Complete() =>
        _target.Complete();

    public void Fault(Exception exception) =>
        _target.Fault(exception);

    public IDisposable LinkTo(ITargetBlock<TValue> target, DataflowLinkOptions linkOptions) =>
        _source.LinkTo(target, linkOptions);

    public TValue ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TValue> target, out bool messageConsumed) =>
        _source.ConsumeMessage(messageHeader, target, out messageConsumed);

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TValue messageValue, ISourceBlock<TValue>? source, bool consumeToAccept) =>
        _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);

    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TValue> target) =>
        _source.ReserveMessage(messageHeader, target);

    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TValue> target) =>
        _source.ReleaseReservation(messageHeader, target);

它不是很漂亮,也没有经过生产测试,但它似乎可以工作。为了实际替换一个已经分派的元素,我必须保留使用的取消标记,这样我就可以取消一个过时但未处理的元素。我不确定这是不是最好的主意,所以欢迎任何批评!

但请注意:这也将处理元素 (1,1),因为在将 (1,0) 分派到 processorBlock 之后,元素 (1,1) 已成功发送到自定义块的输出缓冲区。我认为这是无法避免的。

【讨论】:

无需抱歉。欢迎在这里and actually encouraged 回答我们自己的问题! ?【参考方案2】:

这是我对这个问题的看法:

public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockDropOldestByKey<TInput, TOutput, TKey>(
    Func<TInput, Task<TOutput>> transform,
    Func<TInput, TKey> keySelector,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IEqualityComparer<TKey> keyComparer = null,
    IProgress<TInput> droppedItems = null)

    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
    keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;

    var dictionary = new Dictionary<TKey, TInput>(keyComparer);

    var outputBlock = new TransformManyBlock<TKey, TOutput>(async key =>
    
        bool removed; TInput removedItem;
        lock (dictionary) removed = dictionary.Remove(key, out removedItem);
        if (!removed) return Enumerable.Empty<TOutput>();
        return new[]  await transform(removedItem).ConfigureAwait(false) ;
    , dataflowBlockOptions);

    var inputBlock = new ActionBlock<TInput>(item =>
    
        var key = keySelector(item);
        bool dropped; TInput droppedItem;
        lock (dictionary)
        
            dropped = dictionary.TryGetValue(key, out droppedItem);
            dictionary[key] = item;
        
        if (dropped) droppedItems?.Report(droppedItem);
        return outputBlock.SendAsync(key);
    , new ExecutionDataflowBlockOptions()
    
        BoundedCapacity = 1,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        TaskScheduler = dataflowBlockOptions.TaskScheduler,
    );

    PropagateCompletion(inputBlock, outputBlock);
    PropagateFailure(outputBlock, inputBlock);

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);

    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    
        try  await source.Completion.ConfigureAwait(false);  catch  
        var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (ex != null) target.Fault(ex); else target.Complete();
    

    async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
    
        try  await source.Completion.ConfigureAwait(false);  catch  
        if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
    

使用示例:

var droppedItems = new Progress<Bid>(b =>

    Console.WriteLine($"Dropped: b.Id (b.Value)");
);

var processBlock = CreateTransformBlockDropOldestByKey<Bid, Bid, int>(async b =>

    Console.WriteLine($"Processing: b.Id (b.Value)");
    await Task.Delay(1000);
    return b;
, b => b.Id, droppedItems: droppedItems);

inputBlockoutputBlock 这两个内部块没有直接链接在一起的原因是,否则outputBlock 中的故障可能会使inputBlock 永远处于未完成状态.重要的是,如果两个块之一失败,另一个也应该失败,以便取消对 inputBlock 的任何挂起的 SendAsync 操作。这些块通过使用PropagateCompletionPropagateFailure 方法间接链接在一起。

使用BoundedCapacity 配置processBlock 应考虑到块可能在其输入队列中包含可能已被丢弃的键,因此建议将此配置设置为稍高的值。

【讨论】:

以上是关于用 TPL 数据流中的最新值替换缓冲值的主要内容,如果未能解决你的问题,请参考以下文章

用向量中的随机值填充数据框中的 NA 值(无需替换)

用其他数据框中的匹配值替换索引值

尝试用其他值替换数据框中的值

用数据框的行值替换子列表中的第二项

用R中的另一个值替换数据框多列中出现的数字

用值替换整个数据框中的字符串