Dask:如何有效地分配遗传搜索算法?

Posted

技术标签:

【中文标题】Dask:如何有效地分配遗传搜索算法?【英文标题】:Dask: How to efficiently distribute a genetic search algorithm? 【发布时间】:2018-05-06 07:50:42 【问题描述】:

我已经实现了一种遗传搜索算法并尝试对其进行并行化,但性能却很糟糕(比单线程还差)。我怀疑这是由于通信开销造成的。

我在下面提供了伪代码,但本质上遗传算法会创建一个大型“染色体”对象池,然后运行多次迭代:

    根据每条染色体在“世界”中的表现对每条染色体进行评分。世界在迭代过程中保持静态。 根据上一步计算的分数随机选择一个新的总体 转到步骤 1 进行 n 次迭代

评分算法(第 1 步)是主要瓶颈,因此分发此代码的处理似乎很自然。

我遇到了几个希望能得到帮助的问题:

    如何将计算出的分数与map() 传递给评分函数的对象相关联,即将每个持有分数的Future 链接回Chromosome?通过让calculate_scores() 方法返回对象,我以一种非常笨拙的方式完成了此操作,但实际上,如果有更好的方法来维护链接,我只需要发回float。 评分函数的并行处理工作正常,但map() 需要很长时间才能遍历所有对象。但是,与单线程版本相比,随后对draw_chromosome_from_pool() 的调用运行非常缓慢,以至于我还没有看到它完成。我不知道是什么原因造成的,因为该方法总是在单线程版本中快速完成。即使在所有期货都完成之后,是否有一些 IPC 继续将染色体拉回本地过程?本地流程是否以某种方式降低了优先级? 我担心每个周期构建/重建池的整体迭代性质会导致向工作人员传输大量数据。这个问题的根源在于:Dask实际上什么时候以及什么时候将数据来回发送到工作池。即 Environment() 与 Chromosome() 什么时候分发,结果如何/何时返回?我已经阅读了docs,但要么没有找到正确的细节,要么我太笨了,无法理解。

理想情况下,我认为(但可以更正)我想要的是一个分布式架构,其中每个工作人员在本地“永久”保存Environment() 数据,然后分配Chromosome() 实例数据以进行评分,几乎没有重复迭代之间来回未更改的 Chromosome() 数据。

很长的帖子,如果您花时间阅读此文,已经谢谢您了!

class Chromosome(object):    # Small size: several hundred bytes per instance 
     def get_score():
          # Returns a float
     def set_score(i):
          # Stores a a float

class Environment(object):   # Large size: 20-50Mb per instance, but only one instance
         def calculate_scores(chromosome):
             # Slow calculation using attributes from chromosome and instance data
             chromosome.set_score(x)
             return chromosome

class Evolver(object):
    def draw_chromosome_from_pool(self, max_score):
        while True:
            individual = np.random.choice(self.chromosome_pool)
            selection_chance = np.random.uniform()
            if selection_chance < individual.get_score() / max_score:
                return individual   

    def run_evolution()
         self.dask_client = Client()
         self.chromosome_pool = list()
         for i in range(10000):
             self.chromosome_pool.append( Chromosome() )

         world_data = LoadWorldData() # Returns a pandas Dataframe
         self.world = Environment(world_data)

         iterations = 1000
         for i in range(iterations):
             futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
             for future in as_completed(futures):
                  c = future.result()
                  highest_score = max(highest_score, c.get_score()) 

             new_pool = set()
             while len(new_pool)<self.pool_size:
                 mother = self.draw_chromosome_from_pool(highest_score)
                  # do stuff to build a new pool

【问题讨论】:

【参考方案1】:

是的,每次拨打该线路时

futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)

您正在序列化self.world,它很大。你可以在循环之前只做一次

future_world = client.scatter(self.world, broadcast=True)

然后在循环中

futures = self.dask_client.map(lambda ch: Environment.calculate_scores(future_world, ch), self.chromosome_pool)

将使用worker上已经存在的副本(或一个简单的功能)。关键是 future_world 只是一个指向已经分发的东西的指针,但是 dask 会为你处理这个。

关于哪个染色体的问题:使用as_completed 会破坏您将它们提交给map 的顺序,但这对于您的代码来说不是必需的。您可以使用wait 来处理所有工作何时完成,或者简单地迭代future.result()s(它将等待每个任务完成),然后您将保留染色体池中的顺序。

【讨论】:

谢谢 - 这很有帮助。 您能解释一下map() 分发的染色体对象是如何在本地进程和worker 之间管理的吗?具体来说,如果 Environment.calculate_score() 修改了一个 Chromosome 对象(例如设置一个属性),它如何表现为 IPC?我试图弄清楚calculate_score() 是否更好地更新直接传递给它的染色体,或者返回一个值元组,然后让本地进程遍历池并集中应用更新。 以上面建议的形式使用 map() 会产生“TypeError: zip argument #1 must support iteration”,但是“for c in self.chromosome_pool: futures.append( (self.dask_client.submit( Environment.calculate_scores, future_world, c)" 基本上是等价的并且可以工作。 我认为map 的这种形式应该可以工作。通常,Dask 不应该通过突变工作,而是通过返回新结果来工作。在 python 中,没有可靠的方法来知道某个变量的值/内容是否发生了变化。

以上是关于Dask:如何有效地分配遗传搜索算法?的主要内容,如果未能解决你的问题,请参考以下文章

ga遗传算法如何提高精度

领域搜索算法 是什么 和遗传算法模拟退火算法禁忌搜索算法模糊优化 算法微粒群算法关系

遗传算法

遗传算法的中心思想

路径规划基于遗传算法实现物流中心配送方案matlab源码

优化分配基于matlab遗传算法求解二次分配优化问题含Matlab源码 2391期