具有延迟的 TPL 数据流队列

Posted

技术标签:

【中文标题】具有延迟的 TPL 数据流队列【英文标题】:TPL DataFlow Queue with Postponement 【发布时间】:2021-01-30 11:47:12 【问题描述】:

我正在使用ActionBlock 同时处理一个队列。

这里的一个问题是,当处理队列中的一个项目时,我可能想等到处理队列中的另一个项目满足依赖关系。

我认为我应该能够使用具有链接、延迟和释放延迟的 TPL DataFlow 库来做到这一点,但我不确定要使用什么构造。

在伪代码中:

public class Item 

    public string Name  get; set; 
    public List<string> DependsOn = new List<string>();


ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o => 
    if (!HasActionBlockProcessedAllDependencies(o.DependsOn)) 
    
       // enqueue a callback when ALL dependencies have been completed
     
    else 
    
        DoWork(o);
    
,
new ExecutionDataflowBlockOptions  
    MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
);

var items = new[] 

    new Item  Name = "Apple", DependsOn =  "Pear"  ,
    new Item  Name = "Pear" 

【问题讨论】:

所以你不想处理一个项目,除非它所依赖的其他项目已被处理? 是的 - 一旦它的所有依赖项都以最有效的方式得到满足,我想重新加入该项目。 这是在更大的管道中吗?我问的原因是因为这里的 DataFlow 几乎没有解决问题。尽管并非不可能,但您似乎只想与依赖项保持依赖关系字典,并且在处理依赖项时发布其依赖项 是的,这是一个很大的管道,我想一次处理 10 个项目 链接在这里不会真正帮助你。有几种方法,更复杂的方法是基于队列字典创建自定义块。但是,除非已处理,否则不发布到第一个块可能会更容易,其结构与已解释的内容相似 【参考方案1】:

我不确定这是否对您有帮助,但这里有一个自定义的DependencyTransformBlock 类,它知道它接收到的项目之间的依赖关系,并且只有在成功处理其依赖关系后才处理每个项目。此自定义块支持普通 TransformBlock 的所有内置功能,EnsureOrdered 选项除外。

这个类的构造函数接受一个Func&lt;TInput, TKey&gt; lambda 来检索每个项目的键,并接受一个Func&lt;TInput, IReadOnlyCollection&lt;TKey&gt;&gt; lambda 来检索其依赖项。密钥应该是唯一的。如果发现重复键,则块将失败完成。

如果项目之间存在循环依赖关系,受影响的项目将保持未处理状态。属性TInput[] Unprocessed 允许在块完成后检索未处理的项目。如果未提供任何依赖项,则项也可以保持未处理状态。

public class DependencyTransformBlock<TInput, TKey, TOutput> :
    ITargetBlock<TInput>, ISourceBlock<TOutput>

    private readonly ITargetBlock<TInput> _inputBlock;
    private readonly IPropagatorBlock<Item, TOutput> _transformBlock;

    private readonly object _locker = new object();
    private readonly Dictionary<TKey, Item> _items;

    private int _pendingCount = 1;
    // The initial 1 represents the completion of the _inputBlock

    private class Item
    
        public TKey Key;
        public TInput Input;
        public bool HasInput;
        public bool IsCompleted;
        public HashSet<Item> Dependencies;
        public HashSet<Item> Dependents;

        public Item(TKey key) => Key = key;
    

    public DependencyTransformBlock(
        Func<TInput, Task<TOutput>> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null)
    
        if (transform == null)
            throw new ArgumentNullException(nameof(transform));
        if (keySelector == null)
            throw new ArgumentNullException(nameof(keySelector));
        if (dependenciesSelector == null)
            throw new ArgumentNullException(nameof(dependenciesSelector));

        dataflowBlockOptions =
            dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
        keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;

        _items = new Dictionary<TKey, Item>(keyComparer);

        _inputBlock = new ActionBlock<TInput>(async input =>
        
            var key = keySelector(input);
            var dependencyKeys = dependenciesSelector(input);
            bool isReadyForProcessing = true;
            Item item;
            lock (_locker)
            
                if (!_items.TryGetValue(key, out item))
                
                    item = new Item(key);
                    _items.Add(key, item);
                
                if (item.HasInput)
                    throw new InvalidOperationException($"Duplicate key (key).");
                item.Input = input;
                item.HasInput = true;

                if (dependencyKeys != null && dependencyKeys.Count > 0)
                
                    item.Dependencies = new HashSet<Item>();
                    foreach (var dependencyKey in dependencyKeys)
                    
                        if (!_items.TryGetValue(dependencyKey, out var dependency))
                        
                            dependency = new Item(dependencyKey);
                            _items.Add(dependencyKey, dependency);
                        
                        if (!dependency.IsCompleted)
                        
                            item.Dependencies.Add(dependency);
                            if (dependency.Dependents == null)
                                dependency.Dependents = new HashSet<Item>();
                            dependency.Dependents.Add(item);
                        
                    
                    isReadyForProcessing = item.Dependencies.Count == 0;
                
                if (isReadyForProcessing) _pendingCount++;
            
            if (isReadyForProcessing)
            
                await _transformBlock.SendAsync(item);
            
        , new ExecutionDataflowBlockOptions()
        
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = 1
        );

        var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
        
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = DataflowBlockOptions.Unbounded
        );

        _transformBlock = new TransformBlock<Item, TOutput>(async item =>
        
            try
            
                TInput input;
                lock (_locker)
                
                    Debug.Assert(item.HasInput && !item.IsCompleted);
                    input = item.Input;
                
                var result = await transform(input).ConfigureAwait(false);
                lock (_locker)
                
                    item.IsCompleted = true;
                    if (item.Dependents != null)
                    
                        foreach (var dependent in item.Dependents)
                        
                            Debug.Assert(dependent.Dependencies != null);
                            var removed = dependent.Dependencies.Remove(item);
                            Debug.Assert(removed);
                            if (dependent.HasInput
                                && dependent.Dependencies.Count == 0)
                            
                                middleBuffer.Post(dependent);
                                _pendingCount++;
                            
                        
                    
                    item.Input = default; // Cleanup
                    item.Dependencies = null;
                    item.Dependents = null;
                
                return result;
            
            finally
            
                lock (_locker)
                
                    _pendingCount--;
                    if (_pendingCount == 0) middleBuffer.Complete();
                
            
        , dataflowBlockOptions);

        middleBuffer.LinkTo(_transformBlock);

        PropagateCompletion(_inputBlock, middleBuffer,
            condition: () =>  lock (_locker) return --_pendingCount == 0; );
        PropagateCompletion(middleBuffer, _transformBlock);
        PropagateFailure(_transformBlock, middleBuffer);
        PropagateFailure(_transformBlock, _inputBlock);
    

    // Constructor with synchronous lambda
    public DependencyTransformBlock(
        Func<TInput, TOutput> transform,
        Func<TInput, TKey> keySelector,
        Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IEqualityComparer<TKey> keyComparer = null) : this(
            input => Task.FromResult(transform(input)),
            keySelector, dependenciesSelector, dataflowBlockOptions, keyComparer)
    
        if (transform == null) throw new ArgumentNullException(nameof(transform));
    

    public TInput[] Unprocessed
    
        get
        
            lock (_locker) return _items.Values
                .Where(item => item.HasInput && !item.IsCompleted)
                .Select(item => item.Input)
                .ToArray();
        
    

    public Task Completion => _transformBlock.Completion;
    public void Complete() => _inputBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);

    DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
        DataflowMessageHeader header, TInput value, ISourceBlock<TInput> source,
        bool consumeToAccept)
    
        return _inputBlock.OfferMessage(header, value, source, consumeToAccept);
    

    TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target, out bool messageConsumed)
    
        return _transformBlock.ConsumeMessage(header, target, out messageConsumed);
    

    bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    
        return _transformBlock.ReserveMessage(header, target);
    

    void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<TOutput> target)
    
        _transformBlock.ReleaseReservation(header, target);
    

    public IDisposable LinkTo(ITargetBlock<TOutput> target,
        DataflowLinkOptions linkOptions)
    
        return _transformBlock.LinkTo(target, linkOptions);
    

    private async void PropagateCompletion(IDataflowBlock source,
        IDataflowBlock target, Func<bool> condition = null)
    
        try  await source.Completion.ConfigureAwait(false);  catch  
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
        else
            if (condition == null || condition()) target.Complete();
    

    private async void PropagateFailure(IDataflowBlock source,
        IDataflowBlock target)
    
        try  await source.Completion.ConfigureAwait(false);  catch  
        if (source.Completion.IsFaulted)
            target.Fault(source.Completion.Exception.InnerException);
    

使用示例:

var block = new DependencyTransformBlock<Item, string, Item>(item =>

    DoWork(item);
    return item;
,
keySelector: item => item.Name,
dependenciesSelector: item => item.DependsOn,
new ExecutionDataflowBlockOptions

    MaxDegreeOfParallelism = Environment.ProcessorCount
,
keyComparer: StringComparer.OrdinalIgnoreCase);

//...

block.LinkTo(DataflowBlock.NullTarget<Item>());

在此示例中,该块链接到 NullTarget 以丢弃其输出,因此它本质上成为 ActionBlock 等效项。

【讨论】:

以上是关于具有延迟的 TPL 数据流队列的主要内容,如果未能解决你的问题,请参考以下文章

具有永久任务/线程的 TPL 数据流块

用 TPL 数据流中的最新值替换缓冲值

TPL数据流处理N条最新消息

使用 TPL 数据流对持续时间或阈值进行批处理

多个短期 TPL 数据流与单个长期运行流

基于消息队列(RabbitMQ)实现延迟任务