如何使用 C# 4 中的 TPL 创建一个常量处理“流”
Posted
技术标签:
【中文标题】如何使用 C# 4 中的 TPL 创建一个常量处理“流”【英文标题】:How can I create a constant Processing "Flow" using the TPL in C# 4 【发布时间】:2012-03-03 05:12:56 【问题描述】:我不确定以下是否可行,但我想以节流的方式在并行中调用一些操作,但要保持处理流程的连续性,而不是恢复使用计时器或循环/睡眠周期。
到目前为止,我已经让它从某个来源加载大量输入......然后以受控方式并行处理它们并像下面这样循环。
static void Main(string[] args)
while(true) //Simulate a Timer Elapsing...
IEnumerable<int> inputs = new List<int>() 1, 2, 3, 4, 5, 6, 7, 8, 9, 10;
//Simulate querying database queue tables for next batch of entries
RunAllActions(inputs, 3); //Max 3 at a time.
static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
var options = new ParallelOptions() MaxDegreeOfParallelism = maxConcurrency;
Parallel.ForEach<int>(inputs, options, DoWork);
//Blocks here until all inputs are processed.
Console.WriteLine("Batch of Work Done!!!");
static void DoWork(int input)
Console.WriteLine("Starting Task 0", input);
System.Threading.Thread.Sleep(3000);
Console.WriteLine("Finishing Task 0", input);
我想知道的是,在 TPL 中是否有一个构造可以用来保持它始终运行......这样我就可以用收到的 MessageQueue 替换“Timer Elapses”和“Database Polling”事件。
以下是我想要实现的粗略版本...还有其他方法可以实现,但我想知道 TPL 中是否内置了这种模式。
internal class Engine
private MessageQueue mq;
private Queue<int> myInternalApplicationQueue;
public Engine()
//Message Queue to get new task inputs from
mq = new MessageQueue();
mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
// internal Queue to put them in.
myInternalApplicationQueue = new Queue<int>();
void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
//On MQ Receive, pop the input in a queue in my app
int input = (int) e.Message.Body;
myInternalApplicationQueue.Enqueue(input);
public void StartWorking()
//Once this gets called, it doesn't stop... it just keeps processing/watching that queue
//processing the tasks as fast as it's allowed while the app is running.
var options = new ParallelOptions() MaxDegreeOfParallelism = 3 ;
Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
// ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
【问题讨论】:
【参考方案1】:您可以使用BlockingCollection<T>
来处理这种类型的操作,这实际上是一个生产者/消费者场景。
基本上,您会设置一个BlockingCollection<T>
并将其用作您的“生产者”。然后,您将拥有三个(或任意数量)consumer 任务(通常设置为长时间运行的任务)来处理元素(通过在标准 foreach 循环中调用 blockingCollection.GetConsumingEnumerable()
)。
然后,您可以根据需要将项目添加到集合中,它们将不断得到处理。当你完全完成后,你会调用BlockingCollection<T>.CompleteAdding
,这将导致 foreach 循环完成,整个事情停止。
附带说明 - 您通常不想在 GetConsumingEnumerable()
上使用 Parallel.ForEach
上的 BlockingCollection<T>
- 至少除非您自己处理分区,否则不会。通常最好使用多个任务并让每个任务按顺序进行迭代。原因是Parallel.ForEach
中的默认分区方案会导致问题(它会等到有“块”数据可用,而不是立即处理项目,并且“块”会随着时间的推移越来越大)。
【讨论】:
谢谢里德。会试一试的。【参考方案2】:正如 Reed 指出的那样,BlockingCollection 是一种很好的“手动”方式。缺点是您还必须自己管理消费者。
您可能想要研究的另一种方法是研究TPL Dataflow,这种方法可以让您无需进行大量协调工作。特别是在这样的场景中,您可以只使用ActionBlock<T>
,并且当消息从队列中进来时,您只需Post
将新数据块发送到ActionBlock<T>
,它将使用 TPL 工作线程自动处理它在被子下。这会让你的 Engine
类看起来有点像这样:
ActionBlock<int> myActionBlock = new ActionBlock<int>(this.ProcessWorkItem);
void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
int input = (int)e.Message.Body;
// Post the data to the action block
this.myActionBlock.Post(input);
private void ProcessWorkItem(int workItemData)
// ActionBlock will hand each work item to this method for processing
现在,就控制并行度或容量而言,您可以通过在构造ActionBlock<T>
时传入ExecutionDataflowBlockOptions
来轻松控制ActionBlock<T>
的这些细节。因此,假设我想确保我的并行度永远不会超过四个,并阻止生产者向队列中添加超过一百个项目。我会这样做:
ActionBlock<int> myActionBlock = new ActionBlock<int>(
this.ProcessWorkItem,
new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = 4,
BoundedCapacity = 100
);
【讨论】:
谢谢德鲁。这看起来真的很有趣。不幸的是,我现在被锁定在 .NET 4 或更低版本。 虽然 TDF 被嵌入到 .NET 4.5 中,但有一个单独的 TDF 下载位于 .NET4 TPL 之上。这可以从我在答案中链接的 TPL 数据流主页获得,但这是直接下载链接 (download.microsoft.com/download/F/9/6/…) 和自述文件 (msdn.microsoft.com/en-us/devlabs/gg585583) 干杯德鲁。是的,我已经在我自己的测试虚拟机上下载了它。但我正在做的工作将交付给现有的生产环境,这就是我锁定到 .NET 4.0 的意思。再次感谢。 @EoinCampbell 啊,我明白了。太糟糕了,因为它让生活变得轻松多了。听起来你只需要使用 BlockingCollection以上是关于如何使用 C# 4 中的 TPL 创建一个常量处理“流”的主要内容,如果未能解决你的问题,请参考以下文章