并行执行多对并发任务
Posted
技术标签:
【中文标题】并行执行多对并发任务【英文标题】:Execute multiple pairs of concurrent tasks in parallel 【发布时间】:2021-04-08 11:12:27 【问题描述】:详情:
我有一个游戏,两个独立的 AI 互相对战。 每个 AI 都有自己的任务。 两个任务需要同时启动,需要带一些参数并返回一个值。 现在我想并行运行 100-200 个游戏(每两个任务)。
我现在遇到的问题是这两个任务没有一起启动。只要有一些免费资源,它们就会完全随机启动。
代码:
我目前的做法如下。
我有一个输入对象列表,其中包含一些参数。 使用 Parallel.ForEach,我为每个输入对象创建了一个游戏和两个 AI。 哪个 AI 先完成游戏,另一个 AI 会停止另一个 AI,使用 CancellationToken 进行相同的游戏。 所有返回值都保存在 ConcurrentBag 中。因为每个游戏的两个 AI 任务没有一起启动,所以我添加了一个 AutoResetEvent。我希望我可以等待一个任务,直到第二个任务开始,但 AutoResetEvent.WaitOne 会阻止所有资源。所以 AutoResetEvent 的结果是第一个 AI 任务正在启动并等待第二个任务启动,但由于它们不再释放线程,所以它们永远等待。
private ConcurrentBag<Individual> TrainKis(List<Individual> population)
ConcurrentBag<Individual> resultCollection = new ConcurrentBag<Individual>();
ConcurrentBag<Individual> referenceCollection = new ConcurrentBag<Individual>();
Parallel.ForEach(population, individual =>
GameManager gm = new GameManager();
CancellationTokenSource c = new CancellationTokenSource();
CancellationToken token = c.Token;
AutoResetEvent waitHandle = new AutoResetEvent(false);
KI_base eaKI = new KI_Stupid(gm, individual.number, "KI-" + individual.number, Color.FromArgb(255, 255, 255));
KI_base referenceKI = new KI_Stupid(gm, 999, "REF-" + individual.number, Color.FromArgb(0, 0, 0));
Individual referenceIndividual = CreateIndividual(individual.number, 400, 2000);
var t1 = referenceKI.Start(token, waitHandle, referenceIndividual).ContinueWith(taskInfo =>
c.Cancel();
return taskInfo.Result;
).Result;
var t2 = eaKI.Start(token, waitHandle, individual).ContinueWith(taskInfo =>
c.Cancel();
return taskInfo.Result;
).Result;
referenceCollection.Add(t1);
resultCollection.Add(t2);
);
return resultCollection;
这是我等待第二个AI播放的AI的启动方法:
public Task<Individual> Start(CancellationToken _ct, AutoResetEvent _are, Individual _i)
i = _i;
gm.game.kis.Add(this);
if (gm.game.kis.Count > 1)
_are.Set();
return Task.Run(() => Play(_ct));
else
_are.WaitOne();
return Task.Run(() => Play(_ct));
还有简化的玩法
public override Individual Play(CancellationToken ct)
Console.WriteLine($"player.username started.");
while (Constants.TOWN_NUMBER*0.8 > player.towns.Count || player.towns.Count == 0)
try
Thread.Sleep((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
catch (Exception _ex)
Console.WriteLine($"player.username error: _ex");
//here are the actions of the AI (I removed them for better overview)
if (ct.IsCancellationRequested)
return i;
if (Constants.TOWN_NUMBER * 0.8 <= player.towns.Count)
winner = true;
return i;
return i;
有没有更好的方法来做到这一点,保留所有东西但确保每个游戏中的两个 KI-Tasks 同时启动?
【问题讨论】:
两位AI玩家如何互动?他们是否读写某些共享状态?这些读/写操作是使用lock
或其他同步原语同步的,还是无锁的?
两个 AI 都与同一个游戏管理器交互。游戏管理器的某些部分(如包含游戏当前状态的四叉树)被锁定以防止错误。
是否可以选择使用coroutines而不是Task
s来协调每对AI玩家?这个想法是将每个 AI 玩家公开为 iterator(一种返回 IEnumerable
并包含 yield
语句的方法)而不是 Task
,并且对于每个“展开”两个迭代器的游戏都有一个 Task
一步一个脚印。
两位玩家会不会总是轮流出手?所以 KI1 做了一个动作,然后 KI2、KI1……等等?两个 KI 都需要完全免费玩...
哦,我明白了。您能否在您的问题中包含一个 AI 播放器的异步 Start
方法的简化示例,以便我们能够提供替代建议? (而不是协程)
【参考方案1】:
我的建议是更改Play
方法的签名,使其返回Task<Individual>
而不是Individual
,并将对Thread.Sleep
的调用替换为await Task.Delay
。这个小改动应该会对 AI 玩家的响应产生显着的积极影响,因为他们不会阻塞任何线程,ThreadPool
的小线程池将得到最佳利用。
public override async Task<Individual> Play(CancellationToken ct)
Console.WriteLine($"player.username started.");
while (Constants.TOWN_NUMBER * 0.8 > player.towns.Count || player.towns.Count == 0)
//...
await Task.Delay((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
//...
您还可以考虑将方法名称从Play
更改为PlayAsync
,以符合guidelines。
然后你应该刮掉Parallel.ForEach
方法,因为it is not async-friendly,而是将每个individual
投影到Task
,将所有任务捆绑在一个数组中,并等待它们全部使用Task.WaitAll
方法完成(或者如果你想去async-all-the-way,可以使用await Task.WhenAll
)。
Task[] tasks = population.Select(async individual =>
GameManager gm = new GameManager();
CancellationTokenSource c = new CancellationTokenSource();
//...
var task1 = referenceKI.Start(token, waitHandle, referenceIndividual);
var task2 = eaKI.Start(token, waitHandle, individual);
await Task.WhenAny(task1, task2);
c.Cancel();
await Task.WhenAll(task1, task2);
var result1 = await task1;
var result2 = await task2;
referenceCollection.Add(result1);
resultCollection.Add(result2);
).ToArray();
Task.WaitAll(tasks);
这样您将获得最大并发,这可能并不理想,因为您的机器的 CPU 或 RAM 或 CPU RAM 带宽可能会饱和。您可以查看 here 以了解限制并发异步操作数量的方法。
【讨论】:
@theoretisch 简而言之,在程序中的任何位置定位所有.Wait()
和.Result
调用,并尽可能多地消除它们(理想情况下是全部)。在大多数情况下,这些对程序的响应性或可扩展性是有害的。
刚刚实现了它,它工作得很好。即使同时有 200 多个游戏。太感谢了。所有的链接也很有帮助!
@theoretisch 快速而肮脏的解决方法是通过在程序开始时调用ThreadPool.SetMinThreads(1000, 1000)
来增加ThreadPool
立即按需创建的线程数。尽管这可能会解决您的问题,但这会非常浪费,因为每个线程默认分配1 MB 的 RAM,仅用于其堆栈。因此,如果创建了约 400 个线程,您将浪费约 400MB 的 RAM,专门用于大部分时间都在休眠的线程。 :-)以上是关于并行执行多对并发任务的主要内容,如果未能解决你的问题,请参考以下文章