以并行和顺序方式执行N个线程

Posted

技术标签:

【中文标题】以并行和顺序方式执行N个线程【英文标题】:Executing N number of threads in parallel and in a sequential manner 【发布时间】:2016-01-28 10:03:09 【问题描述】:

我有一个应用程序,其中我有 1 个大文件的 1000 多个小部分。

我一次最多只能上传 16 个部分。

我使用了 .Net 的 Thread 并行库。

我使用 Parallel.For 将其划分为多个部分,并为每个部分分配了 1 个方法,并将 DegreeOfParallelism 设置为 16。

我需要使用由不同部分上传生成的校验和值执行 1 个方法,因此我必须设置某种机制,我必须等待所有部分上传完成,例如 1000。 在 TPL 库中,我面临一个问题是它随机执行 1000 个中的 16 个线程中的任何一个。

我想要一些机制,我可以使用它最初运行前 16 个线程,如果第一个或第二个或 16 个线程中的任何一个完成其任务,则应该开始下一个第 17 部分。

我怎样才能做到这一点?

【问题讨论】:

我喜欢这张图片 如果@usr 的回答不起作用,请查看可能适用的my answer here。否则,如果您可以使用 TPL 的 DataflowBlock 类可能会更好(我认为您不能,因为您指定了 C# 4) @Abdullah 它只是缺少一些手绘的红色圆圈 是客户端 UI 应用还是服务器端应用? 请接受并使用usr的方法。我认为你有分区问题。看看problem described here。 【参考方案1】:

TPL Dataflow 是一个可能的候选对象。这是一个演示,它接收整数流并将它们打印到控制台。您将MaxDegreeOfParallelism 设置为您希望并行旋转的线程数:

void Main()

    var actionBlock = new ActionBlock<int>(
            i => Console.WriteLine(i), 
            new ExecutionDataflowBlockOptions MaxDegreeOfParallelism = 16);

    foreach (var i in Enumerable.Range(0, 200))
    
        actionBlock.Post(i);
    

如果您想拥有多个生产者/消费者,这也可以很好地扩展。

【讨论】:

我认为这是最好的答案【参考方案2】:

这是执行此操作的手动方式。

你需要一个队列。队列是待处理任务的序列。您必须出列并将它们放入工作任务列表中。当任务完成时,将其从工作任务列表中删除并从队列中取出另一个。主线程控制这个过程。以下是如何执行此操作的示例。

对于测试,我使用了整数列表,但它应该适用于其他类型,因为它使用泛型。

private static void Main()

    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    ParallelQueue(items, DoWork);


private static void ParallelQueue<T>(List<T> items, Action<T> action)

    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    
        if (pending.Count != 0 && working.Count < 16)  // Maximum tasks
        
            var item = pending.Dequeue(); // get item from queue
            working.Add(Task.Run(() => action((T)item))); // run task
        
        else
        
            Task.WaitAny(working.ToArray());
            working.RemoveAll(x => x.IsCompleted); // remove finished tasks
        
    


private static void DoWork(int i) // do your work here.

    // this is just an example
    Task.Delay(i).Wait(); 
    Console.WriteLine(i);

如果您遇到如何为自己实施 DoWork 的问题,请告诉我。因为如果您更改方法签名,您可能需要进行一些更改。

更新

您也可以使用异步等待来执行此操作,而不会阻塞主线程。

private static void Main()

    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    Task t = ParallelQueue(items, DoWork);

    // able to do other things.

    t.Wait();


private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func)

    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    
        if (working.Count < 16 && pending.Count != 0)
        
            var item = pending.Dequeue();
            working.Add(Task.Run(async () => await func((T)item)));
        
        else
        
            await Task.WhenAny(working);
            working.RemoveAll(x => x.IsCompleted);
        
    


private static async Task DoWork(int i)

    await Task.Delay(i);

【讨论】:

你为什么使用new Task而不是Task.Run?在服务器代码中使用Wait 进行阻塞通常是个坏主意。 我刚刚让路了。 Task.Delay.Wait 只是一个例子。你可以在DoWork 中做任何你想做的事情。我很感激你帮助了我。我修复了代码。现在好多了(?)....我在多线程方面没有经验。我也是编程新手(只有 1 年)所以请耐心等待;)@avo 我仍然认为有一种非阻塞方式可以做到这一点(没有Wait),但我要撤回我的反对票。【参考方案3】:
var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/;
SingleItemPartitioner.Create(workitems)
 .AsParallel()
 .AsOrdered()
 .WithDegreeOfParallelism(16)
 .WithMergeOptions(ParallelMergeOptions.NotBuffered)
 .ForAll(i =>  Thread.Slee(1000); Console.WriteLine(i); );

这应该就是您所需要的。我忘记了这些方法是如何准确命名的......看看文档。

在休眠 1 秒后打印到控制台进行测试(此示例代码就是这样做的)。

【讨论】:

我的回答被接受了。但我认为不应该。我认为 OP 有这里描述的 ***.com/questions/33869830/… .workitems.Select(x =&gt; x) 需要为项目列表修复此并行。您的方法会将列表分成块,这不是 OP 想要的。所以将列表更改为 ienumerable 将解决他的问题。 @M.kazemAkhgary 这实际上是一个好点,我忘了分区。这确实是一个糟糕的 TPL 默认值,另一个。【参考方案4】:

另一种选择是使用BlockingCollection&lt;T&gt; 作为文件读取器线程和 16 个上传器线程之间的队列。每个上传者线程都会循环使用阻塞集合,直到完成。

而且,如果您想限制队列中的内存消耗,您可以设置阻塞集合的上限,以便文件读取器线程在缓冲区达到容量时暂停。这在您可能需要限制每个用户/API 调用使用的内存的服务器环境中特别有用。

// Create a buffer of 4 chunks between the file reader and the senders
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4);

// Create a cancellation token source so you can stop this gracefully
CancellationTokenSource cts = ...

文件读取线程

...
queue.Add(chunk, cts.Token);
...
queue.CompleteAdding();

发送话题

for(int i = 0; i < 16; i++)

   Task.Run(() => 
      foreach (var chunk in queue.GetConsumingEnumerable(cts.Token))
      
          .. do the upload
      
   );

【讨论】:

以上是关于以并行和顺序方式执行N个线程的主要内容,如果未能解决你的问题,请参考以下文章

我的多线程-多线程必知的N个常识

Jmeter--多个线程组顺序执行和并行执行

jmeter如何并行执行多个线程组

python面试题之多线程好吗?列举一些让Python代码以并行方式运行的方法

并行计算 cv::parallel_for_() 函数

testng多线程并行执行测试