延迟发布到 DataFlow

Posted

技术标签:

【中文标题】延迟发布到 DataFlow【英文标题】:Delayed Post to DataFlow 【发布时间】:2017-08-07 16:58:57 【问题描述】:

将项目发布到TPL DataFlow 时,是否有任何机制可以允许延迟发布?

public partial class BasicDataFlowService

    private readonly ActionBlock<string> workerBlock;

    public BasicDataFlowService()
    
        workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
        
            MaxDegreeOfParallelism = 32
        );
    

    partial void DoWork(string fileName);

    private void AddToDataFlow(string file)
    
        workerBlock.Post(file);
    

AddToDataFlow 内,我希望能够在处理项目之前指定延迟(例如,如果我们决定将处理延迟 30 秒)。

我确实考虑过使用TransFormBlocknew System.Threading.ManualResetEvent(false).WaitOne(1000);,例如

var requeueBlock = new TransformBlock<string, string>(file =>

    new System.Threading.ManualResetEvent(false).WaitOne(1000);
    return file;
);

requeueBlock.LinkTo(workerBlock);

但是,这似乎会不必要地消耗一个线程,而该线程可能会被链中的其他块使用。

【问题讨论】:

【参考方案1】:

首先,您需要将ManualResetEvent 存储为单例,否则所有线程都会有自己的对象等待,您的方法将不起作用。

其次,如果您需要在管道中的一个AppDomain 内进行同步,请考虑使用ManualResetEventSlim 版本而不是重度ManualResetEvent

如果你想重用你机器的内核而不是无用的等待,你应该研究SpinWait轻量级结构。在这种情况下,您可能会发现 Joseph Albahari' article 很有用:

// singleton variable
bool _proceed;

var requeueBlock = new TransformBlock<string, string>(file =>

    var spinWait = new SpinWait();
    while (!_proceed)
    
        // ensure we have the latest _proceed value
        Thread.MemoryBarrier();
        // try to spin for a while
        // after some spins, yield to another thread
        spinWait.SpinOnce();
    
    return file;
);

SpinWait 内部决定如何让步:使用Sleep(0)Sleep(1)Yield 方法调用,因此对您的情况非常有效。

【讨论】:

【参考方案2】:

要在向workerBlock 发布值之前添加延迟,您可以简单地插入延迟并在发布值之前等待它。如果您的workerBlock 容量有限,您可以await SendAsync。实现目标的几个选项:

private async Task AddToDataflow(string file, TimeSpan delay) 
    await Task.Delay(delay);
    await workerBlock.SendAsync(file);


private async Task AddToDataflow(string file) 
    var delay = TimeSpan.FromSeconds(30);
    await Task.Delay(delay);
    await workerBlock.SendAsync(file);


private async void AddToDataflow(string file) 
    var delay = TimeSpan.FromSeconds(30);
    await Task.Delay(delay);
    workerBlock.Post(file);

【讨论】:

以上是关于延迟发布到 DataFlow的主要内容,如果未能解决你的问题,请参考以下文章

将 csv 数据发布/订阅到 Dataflow 到 BigQuery

从 DataFlow 加载到现有 BigQuery 表时是不是可以更新架构?

从 PubSub 导出到 BigQuery - Dataflow 没有任何反应

在 Dataflow SQL 中解析属性

在 Dataflow 中创建作业时出错(当前用户不能充当服务帐户)

Cloud Dataflow 到 BigQuery - 来源过多