延迟发布到 DataFlow
Posted
技术标签:
【中文标题】延迟发布到 DataFlow【英文标题】:Delayed Post to DataFlow 【发布时间】:2017-08-07 16:58:57 【问题描述】:将项目发布到TPL DataFlow
时,是否有任何机制可以允许延迟发布?
public partial class BasicDataFlowService
private readonly ActionBlock<string> workerBlock;
public BasicDataFlowService()
workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
MaxDegreeOfParallelism = 32
);
partial void DoWork(string fileName);
private void AddToDataFlow(string file)
workerBlock.Post(file);
在AddToDataFlow
内,我希望能够在处理项目之前指定延迟(例如,如果我们决定将处理延迟 30 秒)。
我确实考虑过使用TransFormBlock
和new System.Threading.ManualResetEvent(false).WaitOne(1000);
,例如
var requeueBlock = new TransformBlock<string, string>(file =>
new System.Threading.ManualResetEvent(false).WaitOne(1000);
return file;
);
requeueBlock.LinkTo(workerBlock);
但是,这似乎会不必要地消耗一个线程,而该线程可能会被链中的其他块使用。
【问题讨论】:
【参考方案1】:首先,您需要将ManualResetEvent
存储为单例,否则所有线程都会有自己的对象等待,您的方法将不起作用。
其次,如果您需要在管道中的一个AppDomain
内进行同步,请考虑使用ManualResetEventSlim
版本而不是重度ManualResetEvent
。
如果你想重用你机器的内核而不是无用的等待,你应该研究SpinWait
轻量级结构。在这种情况下,您可能会发现 Joseph Albahari' article 很有用:
// singleton variable
bool _proceed;
var requeueBlock = new TransformBlock<string, string>(file =>
var spinWait = new SpinWait();
while (!_proceed)
// ensure we have the latest _proceed value
Thread.MemoryBarrier();
// try to spin for a while
// after some spins, yield to another thread
spinWait.SpinOnce();
return file;
);
SpinWait
内部决定如何让步:使用Sleep(0)
、Sleep(1)
或Yield
方法调用,因此对您的情况非常有效。
【讨论】:
【参考方案2】:要在向workerBlock
发布值之前添加延迟,您可以简单地插入延迟并在发布值之前等待它。如果您的workerBlock
容量有限,您可以await SendAsync
。实现目标的几个选项:
private async Task AddToDataflow(string file, TimeSpan delay)
await Task.Delay(delay);
await workerBlock.SendAsync(file);
private async Task AddToDataflow(string file)
var delay = TimeSpan.FromSeconds(30);
await Task.Delay(delay);
await workerBlock.SendAsync(file);
private async void AddToDataflow(string file)
var delay = TimeSpan.FromSeconds(30);
await Task.Delay(delay);
workerBlock.Post(file);
【讨论】:
以上是关于延迟发布到 DataFlow的主要内容,如果未能解决你的问题,请参考以下文章
将 csv 数据发布/订阅到 Dataflow 到 BigQuery
从 DataFlow 加载到现有 BigQuery 表时是不是可以更新架构?
从 PubSub 导出到 BigQuery - Dataflow 没有任何反应