具有延迟的 TPL 数据流队列
Posted
技术标签:
【中文标题】具有延迟的 TPL 数据流队列【英文标题】:TPL DataFlow Queue with Postponement 【发布时间】:2021-01-30 11:47:12 【问题描述】:我正在使用ActionBlock
同时处理一个队列。
这里的一个问题是,当处理队列中的一个项目时,我可能想等到处理队列中的另一个项目满足依赖关系。
我认为我应该能够使用具有链接、延迟和释放延迟的 TPL DataFlow 库来做到这一点,但我不确定要使用什么构造。
在伪代码中:
public class Item
public string Name get; set;
public List<string> DependsOn = new List<string>();
ActionBlock<Item> block = null;
var block = new ActionBlock<Item>(o =>
if (!HasActionBlockProcessedAllDependencies(o.DependsOn))
// enqueue a callback when ALL dependencies have been completed
else
DoWork(o);
,
new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = resourceProcessorOptions.MaximumProviderConcurrency
);
var items = new[]
new Item Name = "Apple", DependsOn = "Pear" ,
new Item Name = "Pear"
【问题讨论】:
所以你不想处理一个项目,除非它所依赖的其他项目已被处理? 是的 - 一旦它的所有依赖项都以最有效的方式得到满足,我想重新加入该项目。 这是在更大的管道中吗?我问的原因是因为这里的 DataFlow 几乎没有解决问题。尽管并非不可能,但您似乎只想与依赖项保持依赖关系字典,并且在处理依赖项时发布其依赖项 是的,这是一个很大的管道,我想一次处理 10 个项目 链接在这里不会真正帮助你。有几种方法,更复杂的方法是基于队列字典创建自定义块。但是,除非已处理,否则不发布到第一个块可能会更容易,其结构与已解释的内容相似 【参考方案1】:我不确定这是否对您有帮助,但这里有一个自定义的DependencyTransformBlock
类,它知道它接收到的项目之间的依赖关系,并且只有在成功处理其依赖关系后才处理每个项目。此自定义块支持普通 TransformBlock
的所有内置功能,EnsureOrdered
选项除外。
这个类的构造函数接受一个Func<TInput, TKey>
lambda 来检索每个项目的键,并接受一个Func<TInput, IReadOnlyCollection<TKey>>
lambda 来检索其依赖项。密钥应该是唯一的。如果发现重复键,则块将失败完成。
如果项目之间存在循环依赖关系,受影响的项目将保持未处理状态。属性TInput[] Unprocessed
允许在块完成后检索未处理的项目。如果未提供任何依赖项,则项也可以保持未处理状态。
public class DependencyTransformBlock<TInput, TKey, TOutput> :
ITargetBlock<TInput>, ISourceBlock<TOutput>
private readonly ITargetBlock<TInput> _inputBlock;
private readonly IPropagatorBlock<Item, TOutput> _transformBlock;
private readonly object _locker = new object();
private readonly Dictionary<TKey, Item> _items;
private int _pendingCount = 1;
// The initial 1 represents the completion of the _inputBlock
private class Item
public TKey Key;
public TInput Input;
public bool HasInput;
public bool IsCompleted;
public HashSet<Item> Dependencies;
public HashSet<Item> Dependents;
public Item(TKey key) => Key = key;
public DependencyTransformBlock(
Func<TInput, Task<TOutput>> transform,
Func<TInput, TKey> keySelector,
Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IEqualityComparer<TKey> keyComparer = null)
if (transform == null)
throw new ArgumentNullException(nameof(transform));
if (keySelector == null)
throw new ArgumentNullException(nameof(keySelector));
if (dependenciesSelector == null)
throw new ArgumentNullException(nameof(dependenciesSelector));
dataflowBlockOptions =
dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;
_items = new Dictionary<TKey, Item>(keyComparer);
_inputBlock = new ActionBlock<TInput>(async input =>
var key = keySelector(input);
var dependencyKeys = dependenciesSelector(input);
bool isReadyForProcessing = true;
Item item;
lock (_locker)
if (!_items.TryGetValue(key, out item))
item = new Item(key);
_items.Add(key, item);
if (item.HasInput)
throw new InvalidOperationException($"Duplicate key (key).");
item.Input = input;
item.HasInput = true;
if (dependencyKeys != null && dependencyKeys.Count > 0)
item.Dependencies = new HashSet<Item>();
foreach (var dependencyKey in dependencyKeys)
if (!_items.TryGetValue(dependencyKey, out var dependency))
dependency = new Item(dependencyKey);
_items.Add(dependencyKey, dependency);
if (!dependency.IsCompleted)
item.Dependencies.Add(dependency);
if (dependency.Dependents == null)
dependency.Dependents = new HashSet<Item>();
dependency.Dependents.Add(item);
isReadyForProcessing = item.Dependencies.Count == 0;
if (isReadyForProcessing) _pendingCount++;
if (isReadyForProcessing)
await _transformBlock.SendAsync(item);
, new ExecutionDataflowBlockOptions()
CancellationToken = dataflowBlockOptions.CancellationToken,
BoundedCapacity = 1
);
var middleBuffer = new BufferBlock<Item>(new DataflowBlockOptions()
CancellationToken = dataflowBlockOptions.CancellationToken,
BoundedCapacity = DataflowBlockOptions.Unbounded
);
_transformBlock = new TransformBlock<Item, TOutput>(async item =>
try
TInput input;
lock (_locker)
Debug.Assert(item.HasInput && !item.IsCompleted);
input = item.Input;
var result = await transform(input).ConfigureAwait(false);
lock (_locker)
item.IsCompleted = true;
if (item.Dependents != null)
foreach (var dependent in item.Dependents)
Debug.Assert(dependent.Dependencies != null);
var removed = dependent.Dependencies.Remove(item);
Debug.Assert(removed);
if (dependent.HasInput
&& dependent.Dependencies.Count == 0)
middleBuffer.Post(dependent);
_pendingCount++;
item.Input = default; // Cleanup
item.Dependencies = null;
item.Dependents = null;
return result;
finally
lock (_locker)
_pendingCount--;
if (_pendingCount == 0) middleBuffer.Complete();
, dataflowBlockOptions);
middleBuffer.LinkTo(_transformBlock);
PropagateCompletion(_inputBlock, middleBuffer,
condition: () => lock (_locker) return --_pendingCount == 0; );
PropagateCompletion(middleBuffer, _transformBlock);
PropagateFailure(_transformBlock, middleBuffer);
PropagateFailure(_transformBlock, _inputBlock);
// Constructor with synchronous lambda
public DependencyTransformBlock(
Func<TInput, TOutput> transform,
Func<TInput, TKey> keySelector,
Func<TInput, IReadOnlyCollection<TKey>> dependenciesSelector,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IEqualityComparer<TKey> keyComparer = null) : this(
input => Task.FromResult(transform(input)),
keySelector, dependenciesSelector, dataflowBlockOptions, keyComparer)
if (transform == null) throw new ArgumentNullException(nameof(transform));
public TInput[] Unprocessed
get
lock (_locker) return _items.Values
.Where(item => item.HasInput && !item.IsCompleted)
.Select(item => item.Input)
.ToArray();
public Task Completion => _transformBlock.Completion;
public void Complete() => _inputBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _inputBlock.Fault(ex);
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(
DataflowMessageHeader header, TInput value, ISourceBlock<TInput> source,
bool consumeToAccept)
return _inputBlock.OfferMessage(header, value, source, consumeToAccept);
TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<TOutput> target, out bool messageConsumed)
return _transformBlock.ConsumeMessage(header, target, out messageConsumed);
bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<TOutput> target)
return _transformBlock.ReserveMessage(header, target);
void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<TOutput> target)
_transformBlock.ReleaseReservation(header, target);
public IDisposable LinkTo(ITargetBlock<TOutput> target,
DataflowLinkOptions linkOptions)
return _transformBlock.LinkTo(target, linkOptions);
private async void PropagateCompletion(IDataflowBlock source,
IDataflowBlock target, Func<bool> condition = null)
try await source.Completion.ConfigureAwait(false); catch
if (source.Completion.IsFaulted)
target.Fault(source.Completion.Exception.InnerException);
else
if (condition == null || condition()) target.Complete();
private async void PropagateFailure(IDataflowBlock source,
IDataflowBlock target)
try await source.Completion.ConfigureAwait(false); catch
if (source.Completion.IsFaulted)
target.Fault(source.Completion.Exception.InnerException);
使用示例:
var block = new DependencyTransformBlock<Item, string, Item>(item =>
DoWork(item);
return item;
,
keySelector: item => item.Name,
dependenciesSelector: item => item.DependsOn,
new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = Environment.ProcessorCount
,
keyComparer: StringComparer.OrdinalIgnoreCase);
//...
block.LinkTo(DataflowBlock.NullTarget<Item>());
在此示例中,该块链接到 NullTarget
以丢弃其输出,因此它本质上成为 ActionBlock
等效项。
【讨论】:
以上是关于具有延迟的 TPL 数据流队列的主要内容,如果未能解决你的问题,请参考以下文章