标记完成后重新打开 TPL 数据流输入
Posted
技术标签:
【中文标题】标记完成后重新打开 TPL 数据流输入【英文标题】:Reopen TPL Dataflow input after marking it complete 【发布时间】:2019-01-15 18:39:05 【问题描述】:我正在尝试创建一个处理管道服务,用户可以将项目放入其中并等待结果完成处理。我的想法是使用 DI 让它能够注入。
我面临的问题是,在处理完第一组数据并将输入块标记为完成后,当我尝试处理另一组数据时它仍然关闭。有没有办法重新打开管道以允许再次处理数据?
我还在 TPL 数据流上使用名为 DataflowEx 的库。
public interface IPipelineService
Task FillPipeline(object inputObj);
Task WaitForResults();
Task<List<object>> GetResults();
Task FlushPipeline();
Task Complete();
public class Pipeline : Dataflow<object>, IPipelineService
private TransformBlock<object, object> _inputBlock;
private ActionBlock<object> _resultBlock;
private List<object> _results get; set;
public Pipeline() : base(DataflowOptions.Default)
_results = new List<object>();
_inputBlock = new TransformBlock<object, object>(obj => Processing.Processing.ReceiveOrder(obj));
_resultBlock = new ActionBlock<object>(obj => _results.Add(Processing.Processing.ReturnProcessedOrder(obj)));
_inputBlock.LinkTo(_resultBlock, new DataflowLinkOptions() PropagateCompletion = true );
RegisterChild(_inputBlock);
RegisterChild(_resultBlock);
public Task FillPipeline(object inputObj)
//InputBlock.Post(inputObj);
return Task.CompletedTask;
public async Task WaitForResults()
await this.CompletionTask;
public Task<List<object>> GetResults()
return Task.FromResult(_results);
public Task FlushPipeline()
_results = new List<object>();
return Task.CompletedTask;
Task IPipelineService.Complete()
InputBlock.Complete();
return Task.CompletedTask;
public override ITargetBlock<object> InputBlock get return _inputBlock;
public object Result get return _results;
这是我目前用来测试这个想法的基本示例。
这就是我希望能够使用它并能够在它完成第一组处理后将物品送入其中的方式。
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.HomeLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.OtherLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.PersonalLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.CarLoan).order);
await _pipelineService.Complete();
await _pipelineService.WaitForResults();
【问题讨论】:
问题在于这一行InputBlock.Complete();
,一旦该块完成,它将不接受新输入,因此您必须创建一个新块将其重新链接到管道的开头。
@JSteward 让它像我预想的那样工作需要为放置在其中的每组数据生成新的输入块?这是有道理的,因为似乎一旦一个块完成,就没有办法将其恢复。
不幸的是没有,如果你传播完成,不要忘记替换下游块。
@JSteward 好的,感谢您的帮助。我认为这为目前的管道服务理念敲响了警钟。
【参考方案1】:
您无法重新启动已完成的数据流集 - 我只是重置我的对象以重新开始(在这种情况下,我在 CompleteAsync() 中调用 ResetDataFlow)
public class DownloadConnector
public DownloadDataFlow DataFlow get; set;
public DownloadConnector(int maxDop)
DataFlow = new DownloadDataFlow(maxDop);
public async Task SendAsync(DownloadItem item)
await DataFlow.BufferBlock.SendAsync(item);
public async Task CompleteAsync()
DataFlow.BufferBlock.Complete();
await DataFlow.ActionBlock.Completion;
DataFlow.ResetDataFlow();
public class DownloadDataFlow
public BufferBlock<DownloadItem> BufferBlock get; set;
public TransformBlock<DownloadItem, DownloadItem> TransformBlock get; set;
public ActionBlock<DownloadItem> ActionBlock get; set;
public int MaxDop get; set;
public DownloadDataFlow(int maxDop)
MaxDop = maxDop;
ResetDataFlow();
public DownloadDataFlow ResetDataFlow()
BufferBlock = new BufferBlock<DownloadItem>();
TransformBlock = new TransformBlock<DownloadItem, DownloadItem>(DownloadAsync);
ActionBlock = new ActionBlock<DownloadItem>(OnCompletion, new ExecutionDataflowBlockOptions MaxDegreeOfParallelism = MaxDop );
BufferBlock.LinkTo(TransformBlock, new DataflowLinkOptions PropagateCompletion = true );
TransformBlock.LinkTo(ActionBlock, new DataflowLinkOptions PropagateCompletion = true );
return this;
public async Task DownloadAsync(DownloadItem item)
...
public async Task OnCompletion(DownloadItem item)
...
public class DownloadItem
...
代码运行使用:
var connector = new DownloadConnector(10);
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.CompleteAsync();
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.SendAsync(new DownloadItem());
await connector.CompleteAsync();
【讨论】:
以上是关于标记完成后重新打开 TPL 数据流输入的主要内容,如果未能解决你的问题,请参考以下文章
TPL 数据流块永远不会在 PropagateCompletion 上完成
您是不是需要等待 TPL 数据流 DataflowBlock.NullTarget<T> 完成
我可以关闭/重新打开 InputStream 以模拟不支持标记的输入流的标记/重置吗?