并行执行多对并发任务

Posted

技术标签:

【中文标题】并行执行多对并发任务【英文标题】:Execute multiple pairs of concurrent tasks in parallel 【发布时间】:2021-04-08 11:12:27 【问题描述】:

详情:

我有一个游戏,两个独立的 AI 互相对战。 每个 AI 都有自己的任务。 两个任务需要同时启动,需要带一些参数并返回一个值。 现在我想并行运行 100-200 个游戏(每两个任务)。

我现在遇到的问题是这两个任务没有一起启动。只要有一些免费资源,它们就会完全随机启动。

代码:

我目前的做法如下。

我有一个输入对象列表,其中包含一些参数。 使用 Parallel.ForEach,我为每个输入对象创建了一个游戏和两个 AI。 哪个 AI 先完成游戏,另一个 AI 会停止另一个 AI,使用 CancellationToken 进行相同的游戏。 所有返回值都保存在 ConcurrentBag 中。

因为每个游戏的两个 AI 任务没有一起启动,所以我添加了一个 AutoResetEvent。我希望我可以等待一个任务,直到第二个任务开始,但 AutoResetEvent.WaitOne 会阻止所有资源。所以 AutoResetEvent 的结果是第一个 AI 任务正在启动并等待第二个任务启动,但由于它们不再释放线程,所以它们永远等待。

        private ConcurrentBag<Individual> TrainKis(List<Individual> population) 
            ConcurrentBag<Individual> resultCollection = new ConcurrentBag<Individual>();
            ConcurrentBag<Individual> referenceCollection = new ConcurrentBag<Individual>();

            Parallel.ForEach(population, individual =>
            
                GameManager gm = new GameManager();

                CancellationTokenSource c = new CancellationTokenSource();
                CancellationToken token = c.Token;
                AutoResetEvent waitHandle = new AutoResetEvent(false);

                KI_base eaKI = new KI_Stupid(gm, individual.number, "KI-" + individual.number, Color.FromArgb(255, 255, 255));
                KI_base referenceKI = new KI_Stupid(gm, 999, "REF-" + individual.number, Color.FromArgb(0, 0, 0));
                Individual referenceIndividual = CreateIndividual(individual.number, 400, 2000);

                var t1 = referenceKI.Start(token, waitHandle, referenceIndividual).ContinueWith(taskInfo => 
                    c.Cancel();
                    return taskInfo.Result;
                ).Result;
                var t2 = eaKI.Start(token, waitHandle, individual).ContinueWith(taskInfo =>  
                    c.Cancel(); 
                    return taskInfo.Result; 
                ).Result;

                referenceCollection.Add(t1);
                resultCollection.Add(t2);
            );

            return resultCollection;
        

这是我等待第二个AI播放的AI的启动方法:

            public Task<Individual> Start(CancellationToken _ct, AutoResetEvent _are, Individual _i) 
                i = _i;
                gm.game.kis.Add(this);
                if (gm.game.kis.Count > 1) 
                    _are.Set();
                    return Task.Run(() => Play(_ct));
                
                else 
                    _are.WaitOne();
                    return Task.Run(() => Play(_ct));
                
            

还有简化的玩法

public override Individual Play(CancellationToken ct) 
            Console.WriteLine($"player.username started.");
            while (Constants.TOWN_NUMBER*0.8 > player.towns.Count || player.towns.Count == 0) 
                try 
                    Thread.Sleep((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
                
                catch (Exception _ex) 
                    Console.WriteLine($"player.username error: _ex");
                
                
                //here are the actions of the AI (I removed them for better overview)

                if (ct.IsCancellationRequested) 
                    return i;
                
            
            if (Constants.TOWN_NUMBER * 0.8 <= player.towns.Count) 
                winner = true;
                return i;
            
            return i;
        

有没有更好的方法来做到这一点,保留所有东西但确保每个游戏中的两个 KI-Tasks 同时启动?

【问题讨论】:

两位AI玩家如何互动?他们是否读写某些共享状态?这些读/写操作是使用lock 或其他同步原语同步的,还是无锁的? 两个 AI 都与同一个游戏管理器交互。游戏管理器的某些部分(如包含游戏当前状态的四叉树)被锁定以防止错误。 是否可以选择使用coroutines而不是Tasks来协调每对AI玩家?这个想法是将每个 AI 玩家公开为 iterator(一种返回 IEnumerable 并包含 yield 语句的方法)而不是 Task,并且对于每个“展开”两个迭代器的游戏都有一个 Task一步一个脚印。 两位玩家会不会总是轮流出手?所以 KI1 做了一个动作,然后 KI2、KI1……等等?两个 KI 都需要完全免费玩... 哦,我明白了。您能否在您的问题中包含一个 AI 播放器的异步 Start 方法的简化示例,以便我们能够提供替代建议? (而不是协程) 【参考方案1】:

我的建议是更改Play 方法的签名,使其返回Task&lt;Individual&gt; 而不是Individual,并将对Thread.Sleep 的调用替换为await Task.Delay。这个小改动应该会对 AI 玩家的响应产生显着的积极影响,因为他们不会阻塞任何线程,ThreadPool 的小线程池将得到最佳利用。

public override async Task<Individual> Play(CancellationToken ct)

    Console.WriteLine($"player.username started.");
    while (Constants.TOWN_NUMBER * 0.8 > player.towns.Count || player.towns.Count == 0)
    
        //...
        await Task.Delay((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
        //...
    

您还可以考虑将方法名称从Play 更改为PlayAsync,以符合guidelines。

然后你应该刮掉Parallel.ForEach方法,因为it is not async-friendly,而是将每个individual投影到Task,将所有任务捆绑在一个数组中,并等待它们全部使用Task.WaitAll方法完成(或者如果你想去async-all-the-way,可以使用await Task.WhenAll)。

Task[] tasks = population.Select(async individual =>

    GameManager gm = new GameManager();
    CancellationTokenSource c = new CancellationTokenSource();

    //...

    var task1 = referenceKI.Start(token, waitHandle, referenceIndividual);
    var task2 = eaKI.Start(token, waitHandle, individual);

    await Task.WhenAny(task1, task2);
    c.Cancel();
    await Task.WhenAll(task1, task2);
    var result1 = await task1;
    var result2 = await task2;
    referenceCollection.Add(result1);
    resultCollection.Add(result2);
).ToArray();

Task.WaitAll(tasks);

这样您将获得最大并发,这可能并不理想,因为您的机器的 CPU 或 RAM 或 CPU RAM 带宽可能会饱和。您可以查看 here 以了解限制并发异步操作数量的方法。

【讨论】:

@theoretisch 简而言之,在程序中的任何位置定位所有.Wait().Result 调用,并尽可能多地消除它们(理想情况下是全部)。在大多数情况下,这些对程序的响应性或可扩展性是有害的。 刚刚实现了它,它工作得很好。即使同时有 200 多个游戏。太感谢了。所有的链接也很有帮助! @theoretisch 快速而肮脏的解决方法是通过在程序开始时调用ThreadPool.SetMinThreads(1000, 1000) 来增加ThreadPool 立即按需创建的线程数。尽管这可能会解决您的问题,但这会非常浪费,因为每个线程默认分配1 MB 的 RAM,仅用于其堆栈。因此,如果创建了约 400 个线程,您将浪费约 400MB 的 RAM,专门用于大部分时间都在休眠的线程。 :-)

以上是关于并行执行多对并发任务的主要内容,如果未能解决你的问题,请参考以下文章

同步 ,异步,并发/并行,串行

并发,并行

串行并行并发

串行并行并发

帮你快速理解同步 ,异步,并发/并行,串行

1.多线程-了解多线程与高并发