管道卡住了——堆栈溢出工作没有其他解决方案

Posted

技术标签:

【中文标题】管道卡住了——堆栈溢出工作没有其他解决方案【英文标题】:Pipes are getting stuck--no other solution on stack overflow working 【发布时间】:2019-09-07 16:23:59 【问题描述】:

(更新)我正在构建一个模块来分发基于代理的模型,其想法是将模型拆分为多个进程,然后当代理到达边界时,它们被传递给处理该区域的处理器。我可以设置进程并在没有通信的情况下工作,但无法让数据通过管道并更新另一个处理器上的模型段。

我已经在 *** 上尝试了解决方案,并构建了一个简单版本的模型。一旦我将模型对象放入管道中,模型就会挂起(它适用于 python 标准数据类型)。简单的版本只是来回传递代理。

 from pathos.multiprocessing import ProcessPool
 from pathos.helpers import mp
 import copy



 class TestAgent:

 "Agent Class-- Schedule iterates through each agent and \
  executes step function"

 def __init__(self, unique_id, model):
      self.unique_id = unique_id
      self.model = model
      self.type = "agent"

 def step(self):

       pass 
       #print ('     ', self.unique_id, "I have stepped")

class TestModel:

   "Model Class iterates through schedule and executes step function for \
   each agent"

   def __init__(self):
       self.schedule = []
       self.pipe = None
       self.process = None

       for i in range(1000):
           a = TestAgent(i, self)
           self.schedule.append(a)


   def step(self):


       for a in self.schedule:
           a.step()

if __name__ == '__main__':

   pool = ProcessPool(nodes=2)

   #create instance of model
   test_model = TestModel()
   #create copies of model to be run on 2 processors
   test1 = copy.deepcopy(test_model)
   #clear schedule
   test1.schedule = []
   #Put in only half the schedule
   for i in range(0,500):
       test1.schedule.append(test_model.schedule[i])  
   #Give process tracker number
   test1.process = 1
   #repeat for other processor
   test2= copy.deepcopy(test_model)
   test2.schedule = []
   for i in range(500,1000):
       test2.schedule.append(test_model.schedule[i])
   test2.process = 2

   #create pipe
   end1, end2 = mp.Pipe()

   #Main run function for each process
   def run(model, pipe):

      for i in range(5):
          print (model.process)#, [a.unique_id for a in model.schedule])
          model.step() # IT HANGS AFTER INITIAL STEP
          print ("send")
          pipe.send(model.schedule)
          print ("closed")
          sched = pipe.recv()
          print ("received")
          model.schedule = sched



   pool.map(run, [test1, test2], [end1,end2])

代理应切换处理器并执行其打印功能。 (我的下一个问题是同步处理器,使它们保持在每一步,但一次只做一件事。)

【问题讨论】:

我是pathos 作者。如果 (1) 您制作了一个不使用 mesa 但仍会产生错误的简化版本的代码,和/或 (2) 如果您有任何问题,您还可以发布您的回溯,这将有助于人们回答您的问题。它似乎只是挂起,对吗?如果您有管道阻塞并且没有任何信息发送给它,这通常是可能发生的情况。我还建议您尝试SerialPoolThreadPool,看看您是否可以将其识别为与ProcessPool 交互的问题。 如果您在发送 python3 后删除pipe.close,您的示例将运行。我不明白你的错误。你能更具体一点吗? (另外,你的例子有一些缩进错误) 绅士,谢谢。我得到了小版本的工作(删除 pipe.close()),并更新了上面的代码以删除 mesa 依赖项。然后,我将这些相同的修复程序应用于我的实际模型,但仍然遇到相同的问题。我能够通过将代理数量增加到 1000 来重新创建这些问题。它只是挂起,需要一个 ctrl-break,我得到:从多处理/连接:第 287 行 _in_send_bytes [ov.event], False, INFINITE --full tr​​aceback以上 【参考方案1】:

我让它工作了。我超出了 python (8192) 中的管道缓冲区限制。如果代理持有模型的副本作为属性,则尤其如此。下面是上述代码的工作版本,它一次通过一个代理。它使用 Pimpler 来获取所有代理的大小。

from pathos.multiprocessing import ProcessPool
from pathos.helpers import mp
import copy

# do a blocking map on the chosen function

class TestAgent:

"Agent Class-- Schedule iterates through each agent and \
executes step function"

   def __init__(self, unique_id, model):
       self.unique_id = unique_id
       self.type = "agent"

   def step(self):
       pass 


class TestModel:

   "Model Class iterates through schedule and executes step function for \
   each agent"

   def __init__(self):
       from pympler import asizeof 

       self.schedule = []
       self.pipe = None
       self.process = None
       self.size = asizeof.asizeof


       for i in range(1000):
           a = TestAgent(i, self)
           self.schedule.append(a)


   def step(self):


       for a in self.schedule:
           a.step()

if __name__ == '__main__':

   pool = ProcessPool(nodes=2)

   #create instance of model
   test_model = TestModel()
   #create copies of model to be run on 2 processors
   test1 = copy.deepcopy(test_model)
   #clear schedule
   test1.schedule = []
   #Put in only half the schedule
   for i in range(0,500):
       test1.schedule.append(test_model.schedule[i])  
   #Give process tracker number
   test1.process = 1
   #repeat for other processor
   test2= copy.deepcopy(test_model)
   test2.schedule = []
   for i in range(500,1000):
       test2.schedule.append(test_model.schedule[i])
   test2.process = 2

   #create pipe
   end1, end2 = mp.Pipe()

   #Main run function for each process
   def run(model, pipe):

      for i in range(5):
        agents = []
        print (model.process, model.size(model.schedule) ) 
        model.step() # IT HANGS AFTER INITIAL STEP
        #agent_num = list(model.schedule._agents.keys())
        for agent in model.schedule[:]:
            model.schedule.remove(agent)
            pipe.send(agent)
            agent = pipe.recv()
            agents.append(agent)
        print (model.process, "all agents received")
        for agent in agents: 
            model.schedule.append(agent)

        print (model.process, len(model.schedule))



   pool.map(run, [test1, test2], [end1,end2])

Mike McKerns 和 Thomas Moreau -- 感谢您帮助我走上正确的道路。

【讨论】:

以上是关于管道卡住了——堆栈溢出工作没有其他解决方案的主要内容,如果未能解决你的问题,请参考以下文章

win7下安装了delphi2010,打开时显示一半就卡住了,内存很快被占满,并显示堆栈溢出,咋回事啊

怎么防止堆栈溢出

怎么解决 LINUX 堆栈溢出内存的问题

Windows 10 资源管理器路径最大。字符(堆栈溢出的其他解决方案不起作用)

如何在 C++ 中处理或避免堆栈溢出

vb 堆栈溢出