Dask:如何有效地分配遗传搜索算法?
Posted
技术标签:
【中文标题】Dask:如何有效地分配遗传搜索算法?【英文标题】:Dask: How to efficiently distribute a genetic search algorithm? 【发布时间】:2018-05-06 07:50:42 【问题描述】:我已经实现了一种遗传搜索算法并尝试对其进行并行化,但性能却很糟糕(比单线程还差)。我怀疑这是由于通信开销造成的。
我在下面提供了伪代码,但本质上遗传算法会创建一个大型“染色体”对象池,然后运行多次迭代:
-
根据每条染色体在“世界”中的表现对每条染色体进行评分。世界在迭代过程中保持静态。
根据上一步计算的分数随机选择一个新的总体
转到步骤 1 进行 n 次迭代
评分算法(第 1 步)是主要瓶颈,因此分发此代码的处理似乎很自然。
我遇到了几个希望能得到帮助的问题:
-
如何将计算出的分数与
map()
传递给评分函数的对象相关联,即将每个持有分数的Future
链接回Chromosome
?通过让calculate_scores()
方法返回对象,我以一种非常笨拙的方式完成了此操作,但实际上,如果有更好的方法来维护链接,我只需要发回float
。
评分函数的并行处理工作正常,但map()
需要很长时间才能遍历所有对象。但是,与单线程版本相比,随后对draw_chromosome_from_pool()
的调用运行非常缓慢,以至于我还没有看到它完成。我不知道是什么原因造成的,因为该方法总是在单线程版本中快速完成。即使在所有期货都完成之后,是否有一些 IPC 继续将染色体拉回本地过程?本地流程是否以某种方式降低了优先级?
我担心每个周期构建/重建池的整体迭代性质会导致向工作人员传输大量数据。这个问题的根源在于:Dask实际上什么时候以及什么时候将数据来回发送到工作池。即 Environment() 与 Chromosome() 什么时候分发,结果如何/何时返回?我已经阅读了docs,但要么没有找到正确的细节,要么我太笨了,无法理解。
理想情况下,我认为(但可以更正)我想要的是一个分布式架构,其中每个工作人员在本地“永久”保存Environment()
数据,然后分配Chromosome()
实例数据以进行评分,几乎没有重复迭代之间来回未更改的 Chromosome() 数据。
很长的帖子,如果您花时间阅读此文,已经谢谢您了!
class Chromosome(object): # Small size: several hundred bytes per instance
def get_score():
# Returns a float
def set_score(i):
# Stores a a float
class Environment(object): # Large size: 20-50Mb per instance, but only one instance
def calculate_scores(chromosome):
# Slow calculation using attributes from chromosome and instance data
chromosome.set_score(x)
return chromosome
class Evolver(object):
def draw_chromosome_from_pool(self, max_score):
while True:
individual = np.random.choice(self.chromosome_pool)
selection_chance = np.random.uniform()
if selection_chance < individual.get_score() / max_score:
return individual
def run_evolution()
self.dask_client = Client()
self.chromosome_pool = list()
for i in range(10000):
self.chromosome_pool.append( Chromosome() )
world_data = LoadWorldData() # Returns a pandas Dataframe
self.world = Environment(world_data)
iterations = 1000
for i in range(iterations):
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
for future in as_completed(futures):
c = future.result()
highest_score = max(highest_score, c.get_score())
new_pool = set()
while len(new_pool)<self.pool_size:
mother = self.draw_chromosome_from_pool(highest_score)
# do stuff to build a new pool
【问题讨论】:
【参考方案1】:是的,每次拨打该线路时
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
您正在序列化self.world
,它很大。你可以在循环之前只做一次
future_world = client.scatter(self.world, broadcast=True)
然后在循环中
futures = self.dask_client.map(lambda ch: Environment.calculate_scores(future_world, ch), self.chromosome_pool)
将使用worker上已经存在的副本(或一个简单的功能)。关键是 future_world
只是一个指向已经分发的东西的指针,但是 dask 会为你处理这个。
关于哪个染色体的问题:使用as_completed
会破坏您将它们提交给map
的顺序,但这对于您的代码来说不是必需的。您可以使用wait
来处理所有工作何时完成,或者简单地迭代future.result()
s(它将等待每个任务完成),然后您将保留染色体池中的顺序。
【讨论】:
谢谢 - 这很有帮助。 您能解释一下map() 分发的染色体对象是如何在本地进程和worker 之间管理的吗?具体来说,如果 Environment.calculate_score() 修改了一个 Chromosome 对象(例如设置一个属性),它如何表现为 IPC?我试图弄清楚calculate_score() 是否更好地更新直接传递给它的染色体,或者返回一个值元组,然后让本地进程遍历池并集中应用更新。 以上面建议的形式使用 map() 会产生“TypeError: zip argument #1 must support iteration”,但是“for c in self.chromosome_pool: futures.append( (self.dask_client.submit( Environment.calculate_scores, future_world, c)" 基本上是等价的并且可以工作。 我认为map
的这种形式应该可以工作。通常,Dask 不应该通过突变工作,而是通过返回新结果来工作。在 python 中,没有可靠的方法来知道某个变量的值/内容是否发生了变化。以上是关于Dask:如何有效地分配遗传搜索算法?的主要内容,如果未能解决你的问题,请参考以下文章