多处理全局变量更新未返回给父级
Posted
技术标签:
【中文标题】多处理全局变量更新未返回给父级【英文标题】:multiprocessing global variable updates not returned to parent 【发布时间】:2012-06-18 19:07:33 【问题描述】:我正在尝试从子进程返回值,但不幸的是这些值是不可取的。因此,我在线程模块中成功使用了全局变量,但在使用多处理模块时无法检索子进程中完成的更新。我希望我错过了什么。
最后打印的结果总是与给定变量dataDV03
和dataDV04
的初始值相同。子进程正在更新这些全局变量,但这些全局变量在父进程中保持不变。
import multiprocessing
# NOT ABLE to get python to return values in passed variables.
ants = ['DV03', 'DV04']
dataDV03 = ['', '']
dataDV04 = 'driver': '', 'status': ''
def getDV03CclDrivers(lib): # call global variable
global dataDV03
dataDV03[1] = 1
dataDV03[0] = 0
# eval( 'CCL.' + lib + '.' + lib + '( "DV03" )' ) these are unpicklable instantiations
def getDV04CclDrivers(lib, dataDV04): # pass global variable
dataDV04['driver'] = 0 # eval( 'CCL.' + lib + '.' + lib + '( "DV04" )' )
if __name__ == "__main__":
jobs = []
if 'DV03' in ants:
j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',))
jobs.append(j)
if 'DV04' in ants:
j = multiprocessing.Process(target=getDV04CclDrivers, args=('LORR', dataDV04))
jobs.append(j)
for j in jobs:
j.start()
for j in jobs:
j.join()
print 'Results:\n'
print 'DV03', dataDV03
print 'DV04', dataDV04
我无法发布我的问题,因此将尝试编辑原件。
这是不可腌制的对象:
In [1]: from CCL import LORR
In [2]: lorr=LORR.LORR('DV20', None)
In [3]: lorr
Out[3]: <CCL.LORR.LORR instance at 0x94b188c>
这是我使用 multiprocessing.Pool 将实例返回给父级时返回的错误:
Thread getCcl (('DV20', 'LORR'),)
Process PoolWorker-1:
Traceback (most recent call last):
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/pool.py", line 71, in worker
put((job, i, result))
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/queues.py", line 366, in put
return send(obj)
UnpickleableError: Cannot pickle <type 'thread.lock'> objects
In [5]: dir(lorr)
Out[5]:
['GET_AMBIENT_TEMPERATURE',
'GET_CAN_ERROR',
'GET_CAN_ERROR_COUNT',
'GET_CHANNEL_NUMBER',
'GET_COUNT_PER_C_OP',
'GET_COUNT_REMAINING_OP',
'GET_DCM_LOCKED',
'GET_EFC_125_MHZ',
'GET_EFC_COMB_LINE_PLL',
'GET_ERROR_CODE_LAST_CAN_ERROR',
'GET_INTERNAL_SLAVE_ERROR_CODE',
'GET_MAGNITUDE_CELSIUS_OP',
'GET_MAJOR_REV_LEVEL',
'GET_MINOR_REV_LEVEL',
'GET_MODULE_CODES_CDAY',
'GET_MODULE_CODES_CMONTH',
'GET_MODULE_CODES_DIG1',
'GET_MODULE_CODES_DIG2',
'GET_MODULE_CODES_DIG4',
'GET_MODULE_CODES_DIG6',
'GET_MODULE_CODES_SERIAL',
'GET_MODULE_CODES_VERSION_MAJOR',
'GET_MODULE_CODES_VERSION_MINOR',
'GET_MODULE_CODES_YEAR',
'GET_NODE_ADDRESS',
'GET_OPTICAL_POWER_OFF',
'GET_OUTPUT_125MHZ_LOCKED',
'GET_OUTPUT_2GHZ_LOCKED',
'GET_PATCH_LEVEL',
'GET_POWER_SUPPLY_12V_NOT_OK',
'GET_POWER_SUPPLY_15V_NOT_OK',
'GET_PROTOCOL_MAJOR_REV_LEVEL',
'GET_PROTOCOL_MINOR_REV_LEVEL',
'GET_PROTOCOL_PATCH_LEVEL',
'GET_PROTOCOL_REV_LEVEL',
'GET_PWR_125_MHZ',
'GET_PWR_25_MHZ',
'GET_PWR_2_GHZ',
'GET_READ_MODULE_CODES',
'GET_RX_OPT_PWR',
'GET_SERIAL_NUMBER',
'GET_SIGN_OP',
'GET_STATUS',
'GET_SW_REV_LEVEL',
'GET_TE_LENGTH',
'GET_TE_LONG_FLAG_SET',
'GET_TE_OFFSET_COUNTER',
'GET_TE_SHORT_FLAG_SET',
'GET_TRANS_NUM',
'GET_VDC_12',
'GET_VDC_15',
'GET_VDC_7',
'GET_VDC_MINUS_7',
'SET_CLEAR_FLAGS',
'SET_FPGA_LOGIC_RESET',
'SET_RESET_AMBSI',
'SET_RESET_DEVICE',
'SET_RESYNC_TE',
'STATUS',
'_HardwareDevice__componentName',
'_HardwareDevice__hw',
'_HardwareDevice__stickyFlag',
'_LORRBase__logger',
'__del__',
'__doc__',
'__init__',
'__module__',
'_devices',
'clearDeviceCommunicationErrorAlarm',
'getControlList',
'getDeviceCommunicationErrorCounter',
'getErrorMessage',
'getHwState',
'getInternalSlaveCanErrorMsg',
'getLastCanErrorMsg',
'getMonitorList',
'hwConfigure',
'hwDiagnostic',
'hwInitialize',
'hwOperational',
'hwSimulation',
'hwStart',
'hwStop',
'inErrorState',
'isMonitoring',
'isSimulated']
In [6]:
【问题讨论】:
“这些值是不可腌制的” - 你的意思是你打包到你的全局变量中的东西不能被腌制吗?如果是这种情况,那么您不能使用子流程(AFAIK),因为这就是信息在流程之间传递的方式。如果数据能够被腌制,您将需要使用Manager
。
除了回答您的问题外,您不应该“发布”任何内容。因此,您改为编辑很好;在这种情况下,这是正确的做法。
我最近发现的另一种选择是apply_async
回调。回调在父进程中执行。这意味着子进程返回的任何东西都可以传递给回调进程,然后回调进程可以改变全局变量。然而,这需要在回调函数的顶部使用global variableName
声明。
【参考方案1】:
当您使用multiprocessing
打开第二个进程时,会创建一个全新的 Python 实例,它具有自己的全局状态。该全局状态不是共享的,因此子进程对全局变量所做的更改对父进程是不可见的。
此外,multiprocessing
提供的大多数抽象都使用 pickle 来传输数据。使用代理must be pickleable 传输的所有数据;包括Manager
provides 的所有对象。相关引文(我的重点):
确保代理的方法的参数是可挑选的。
并且(在Manager
部分中):
其他进程可以通过使用代理访问共享对象。
Queue
s 也需要可腌制数据;文档没有这么说,但快速测试证实了这一点:
import multiprocessing
import pickle
class Thing(object):
def __getstate__(self):
print 'got pickled'
return self.__dict__
def __setstate__(self, state):
print 'got unpickled'
self.__dict__.update(state)
q = multiprocessing.Queue()
p = multiprocessing.Process(target=q.put, args=(Thing(),))
p.start()
print q.get()
p.join()
输出:
$ python mp.py
got pickled
got unpickled
<__main__.Thing object at 0x10056b350>
可能适合你的一种方法,如果你真的不能腌制数据,那就是找到一种方法将其存储为 ctype
对象;对内存的引用可以是passed to a child process。这对我来说似乎很狡猾。我从来没有做过。但这对您来说可能是一个可能的解决方案。
鉴于您的更新,您似乎需要更多地了解LORR
的内部结构。 LORR
是一个类吗?你能继承它吗?它是其他东西的子类吗?它的 MRO 是什么? (尝试LORR.__mro__
并发布输出,如果它有效。)如果它是一个纯python 对象,则可以对其进行子类化,创建__setstate__
和__getstate__
以启用酸洗。
另一种方法可能是弄清楚如何从LORR
实例中获取相关数据并通过简单的字符串传递它。既然你说你真的只是想调用对象的方法,那为什么不直接用Queue
s 来回发送消息呢?换句话说,像这样(示意性地):
Main Process Child 1 Child 2
LORR 1 LORR 2
child1_in_queue -> get message 'foo'
call 'foo' method
child1_out_queue <- return foo data string
child2_in_queue -> get message 'bar'
call 'bar' method
child2_out_queue <- return bar data string
【讨论】:
@user1459256,考虑我的编辑(在我的帖子底部)。我们需要有关LORR
对象的更多信息来开发可行的方法。
@user1459256,好的,当你说你需要稍后调用方法来获取调用时的当前数据时——这让我觉得你真的不需要转移LORR
对象,但是您宁愿需要传输LORR
对象上的方法返回的数据。但这应该很容易!只需使用通过Queue
传递的消息来告诉子进程调用特定方法,然后让子进程通过返回Queue
返回一个值。
@user1459256,查看我最近的编辑,让我知道那里提出的解决方案是否可行。
抱歉,我一直在慢慢消化这一切。 LORR 对象没有 mro。它是 C++ 代码的 python 接口。我需要对象持续几分钟到几小时。我在想线程不应该被长时间管理。由于我无法将对象传递出子流程,因此我需要在调用时传递方法的结果时让子流程保持活动数小时。这合理吗?如果是这样,这似乎是解决问题的方法。
@user1459256:就我个人而言,我不明白为什么如果您经常与子进程保持沟通,那么为什么它们会成为一个很大的问题。 (或长时间运行的线程)【参考方案2】:
@DBlas 在答案中为您提供了一个快速 url 和对 Manager 类的引用,但我认为它仍然有点模糊,所以我认为它可能对您有所帮助...
import multiprocessing
from multiprocessing import Manager
ants = ['DV03', 'DV04']
def getDV03CclDrivers(lib, data_dict):
data_dict[1] = 1
data_dict[0] = 0
def getDV04CclDrivers(lib, data_list):
data_list['driver'] = 0
if __name__ == "__main__":
manager = Manager()
dataDV03 = manager.list(['', ''])
dataDV04 = manager.dict('driver': '', 'status': '')
jobs = []
if 'DV03' in ants:
j = multiprocessing.Process(
target=getDV03CclDrivers,
args=('LORR', dataDV03))
jobs.append(j)
if 'DV04' in ants:
j = multiprocessing.Process(
target=getDV04CclDrivers,
args=('LORR', dataDV04))
jobs.append(j)
for j in jobs:
j.start()
for j in jobs:
j.join()
print 'Results:\n'
print 'DV03', dataDV03
print 'DV04', dataDV04
因为多处理实际上使用单独的进程,所以不能简单地共享全局变量,因为它们将位于内存中完全不同的“空间”中。您在一个流程下对全局所做的事情不会反映在另一个流程中。虽然我承认它看起来很混乱,因为你看到它的方式,它都存在于同一段代码中,所以“为什么这些方法不能访问全局”?很难理解它们将在不同进程中运行的想法。
Manager class 用作数据结构的代理,可以为您在进程之间来回传递信息。你要做的就是从一个管理器创建一个特殊的字典和列表,将它们传递到你的方法中,然后在本地对它们进行操作。
不可腌制的数据
对于您的专业 LORR 对象,您可能需要创建类似代理的东西,它可以表示实例的可选择状态。
不是超级健壮,也不是经过太多测试,但给了你想法。
class LORRProxy(object):
def __init__(self, lorrObject=None):
self.instance = lorrObject
def __getstate__(self):
# how to get the state data out of a lorr instance
inst = self.instance
state = dict(
foo = inst.a,
bar = inst.b,
)
return state
def __setstate__(self, state):
# rebuilt a lorr instance from state
lorr = LORR.LORR()
lorr.a = state['foo']
lorr.b = state['bar']
self.instance = lorr
【讨论】:
但是Manager
对象使用pickle
来传输数据。因此,如果 OP 的数据真的不可腌制,我认为这是行不通的。
@senderle:这可能是真的,但是 OP 还没有提供任何关于不可腌制数据的示例。我只能回答我所看到的:-)
感谢您的详细解答。根据回复,死锁是 1)我需要传递给父进程的值返回 pickle 错误,而子进程分隔的空格阻止传回全局变量。
@user1459256:senderle 展示了如何创建可以腌制的类结构的示例。因此,您可以将其与 Queue 建议或我的建议结合使用,方法是将它们添加到 manager dict 并列出代理对象。
我无法更改用于实例化对象的底层代码。它在 ALMA 天文台系统的深处,需要付出巨大的努力才能让有人来改变它。可以将任意类实例化或转换为 ctype 吗?【参考方案3】:
使用multiprocess
时,进程间传递对象的唯一方法是使用Queue
或Pipe
;全局变量不共享。对象必须是可腌制的,所以 multiprocess
在这里帮不了你。
【讨论】:
多进程有一个管理器,用于在进程之间穿梭数据。 是队列再次将值传递给父函数还是仅用于同级别函数的唯一方法? . 文档 (docs.python.org/3/library/…) 说“共享对象将是进程和线程安全的”。这到底是什么意思?我说得对吗,这意味着相应的进程/线程将在更新值之前在内部应用锁,然后更改为值,然后释放锁。对吗?【参考方案4】:您也可以使用multiprocessing Array。这允许您在进程之间拥有共享状态,并且可能是最接近全局变量的东西。
在 main 的顶部,声明一个数组。第一个参数“i”表示它将是整数。第二个参数给出初始值:
shared_dataDV03 = multiprocessing.Array ('i', (0, 0)) #a shared array
然后将此数组作为参数传递给进程:
j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',shared_dataDV03))
你必须在被调用的函数中接收数组参数,然后你可以在函数内修改它:
def getDV03CclDrivers(lib,arr): # call global variable
arr[1]=1
arr[0]=0
该数组与父级共享,因此您可以在父级中打印出末尾的值:
print 'DV03', shared_dataDV03[:]
它会显示变化:
DV03 [0, 1]
【讨论】:
【参考方案5】:我使用 p.map() 将多个进程分拆到远程服务器,并在它们在不可预知的时间返回时打印结果:
Servers=[...]
from multiprocessing import Pool
p=Pool(len(Servers))
p.map(DoIndividualSummary, Servers)
如果DoIndividualSummary
使用print
作为结果,这工作正常,但整体结果的顺序不可预测,这使得解释变得困难。我尝试了多种使用全局变量的方法,但都遇到了问题。最后,我用 sqlite3 成功了。
在p.map()
之前,打开一个sqlite连接并创建一个表:
import sqlite3
conn=sqlite3.connect('servers.db') # need conn for commit and close
db=conn.cursor()
try: db.execute('''drop table servers''')
except: pass
db.execute('''CREATE TABLE servers (server text, serverdetail text, readings text)''')
conn.commit()
然后,当从DoIndividualSummary()
返回时,将结果保存到表中:
db.execute('''INSERT INTO servers VALUES (?,?,?)''', (server,serverdetail,readings))
conn.commit()
return
map()
语句之后,打印结果:
db.execute('''select * from servers order by server''')
rows=db.fetchall()
for server,serverdetail,readings in rows: print serverdetail,readings
可能看起来有点矫枉过正,但对我来说它比推荐的解决方案更简单。
【讨论】:
以上是关于多处理全局变量更新未返回给父级的主要内容,如果未能解决你的问题,请参考以下文章
Python多处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗?
POSTMAN 错误:未解析变量:“此变量未在活动集合、环境或全局变量中定义。”
python多处理是否通过全局标志变量安全地进行进程间信令?