CELERY 内存中的变量串数据
Posted huim
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CELERY 内存中的变量串数据相关的知识,希望对你有一定的参考价值。
CELERY 内存中的变量串数据
0X01 背景
周四周五,花了两天的时间,思考着celery内存泄露的问题。
情境如下:
在使用内存变量的时候(跨步骤调用变量),突然想到一个问题,celery 多进程会不会共用一个变量,导致多个进程在运行的时候变量错乱了呢?
比如在进程1中把target 设置为A,在进程2中把target设置成2,两个进程的target共用一个,导致进程1拿到的target其实是A。
再具体点,插件扫描入口有集群名,经过层层调用最后在发包函数上需要这东西,最好通过内存方式的变量来传递,但是如果进程间会相互覆盖,就凉了。
0X02 DEMO与疑问
于是写了个demo作测试
1) celery任务生产端? celery_test.py
from tasks import task_main
for i in range(100):
task_main.apply_async(args=[str(i)], queue=‘Aefault‘, routing_key=‘aefault‘)
2) 变量文件? data_type.py
import copy
import types
my_data = dict()
class AttribDict(dict):
"""
This class defines the dictionary with added capability to access members as attributes
>>> foo = AttribDict()
>>> foo.bar = 1
>>> foo.bar
1
"""
def __init__(self, indict=None, attribute=None):
if indict is None:
indict = {}
# Set any attributes here - before initialisation
# these remain as normal attributes
self.attribute = attribute
dict.__init__(self, indict)
self.__initialised = True
# After initialisation, setting attributes
# is the same as setting an item
def __getattr__(self, item):
"""
Maps values to attributes
Only called if there *is NOT* an attribute with this name
"""
try:
return self.__getitem__(item)
except KeyError:
return ""
# raise AttributeError("unable to access item ‘%s‘" % item)
def __setattr__(self, item, value):
"""
Maps attributes to values
Only if we are initialised
"""
# This test allows attributes to be set in the __init__ method
if "_AttribDict__initialised" not in self.__dict__:
return dict.__setattr__(self, item, value)
# Any normal attributes are handled normally
elif item in self.__dict__:
dict.__setattr__(self, item, value)
else:
self.__setitem__(item, value)
def __getstate__(self):
return self.__dict__
def __setstate__(self, dict):
self.__dict__ = dict
def __deepcopy__(self, memo):
retVal = self.__class__()
memo[id(self)] = retVal
for attr in dir(self):
if not attr.startswith(‘_‘):
value = getattr(self, attr)
if not isinstance(value, (types.BuiltinFunctionType, types.FunctionType, types.MethodType)):
setattr(retVal, attr, copy.deepcopy(value, memo))
for key, value in self.items():
retVal.__setitem__(key, copy.deepcopy(value, memo))
return retVal
data_info = AttribDict()
print(id(data_info))
def change_add():
b = AttribDict()
data_info = b
change_add()
print(id(data_info))
3) celery任务消费与子任务再生产 tasks.py
from celery import Task
from celery_app import app
from data_type import data_info, change_add, my_data
import random
@app.task(name=‘tasks.main‘, base=MyTask, bind=True, max_retries=10, default_retry_delay=10)
def task_main(self, param):
# change_add()
# print(str(os.getpid()) + " " + str(threading.currentThread().ident))
data_info["A_DATA " + param + " " + str(random.randint(0, 1000))] = 0
my_data["A_DATA " + param + " " + str(random.randint(0, 1000))] = 0
print("--- MAIN " + param + " " + str(id(data_info)) + " " + str(data_info))
print("=== MAIN " + param + " " + str(id(my_data)) + " " + str(my_data))
task_node.apply_async(args=[str(param)], queue=‘Aefault‘, routing_key=‘aefault‘)
4) 子任务消费 recv.py
from celery_app import app
from data_type import data_info, change_add, my_data
from recv2 import recv2
import random
@app.task()
def task_node(param):
print("--- NODE " + param + " " + str(id(data_info)) + " " + str(data_info))
print("=== NODE " + param + " " + str(id(my_data)) + " " + str(my_data))
data_info["B_DATA " + param + " " + str(random.randint(0, 1000))] = 0
my_data["B_DATA " + param + " " + str(random.randint(0, 1000))] = 0
recv2(param)
5) 子任务调用 recv2.py
from data_type import data_info, change_add, my_data
def recv2(param):
print("--- recv " + param + " " + str(id(data_info)) + " " + str(data_info))
print("=== recv " + param + " " + str(id(my_data)) + " " + str(my_data))
6) 日志记录
[2020-06-12 17:39:11,535: WARNING/ForkPoolWorker-10] --- MAIN 0 140072813068960 {‘A_DATA 0 243‘: 0}
[2020-06-12 17:39:11,535: WARNING/ForkPoolWorker-10] === MAIN 0 140072813219184 {‘A_DATA 0 711‘: 0}
[2020-06-12 17:39:11,542: INFO/MainProcess] Received task: tasks.main[281a82a5-57e3-4442-80ea-6b5218871b98]
[2020-06-12 17:39:11,544: INFO/MainProcess] Received task: tasks.main[ef098266-0cab-438e-ba6a-db3ee486c0da]
[2020-06-12 17:39:11,546: WARNING/ForkPoolWorker-3] --- MAIN 1 140072813068960 {‘A_DATA 1 61‘: 0}
[2020-06-12 17:39:11,546: WARNING/ForkPoolWorker-3] === MAIN 1 140072813219184 {‘A_DATA 1 85‘: 0}
[2020-06-12 17:39:11,548: INFO/MainProcess] Received task: tasks.main[152afd1e-92c1-4738-8602-bde7d29f8ce4]
[2020-06-12 17:39:11,548: WARNING/ForkPoolWorker-2] --- MAIN 2 140072813068960 {‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,549: WARNING/ForkPoolWorker-2] === MAIN 2 140072813219184 {‘A_DATA 2 802‘: 0}
[2020-06-12 17:39:11,550: WARNING/ForkPoolWorker-6] --- MAIN 3 140072813068960 {‘A_DATA 3 574‘: 0}
[2020-06-12 17:39:11,551: WARNING/ForkPoolWorker-6] === MAIN 3 140072813219184 {‘A_DATA 3 208‘: 0}
[2020-06-12 17:39:11,560: INFO/MainProcess] Received task: tasks.main[de16ad35-bde8-41ed-a910-8eb8e9dff486]
[2020-06-12 17:39:11,562: INFO/MainProcess] Received task: tasks.main[444cbb4e-7af2-4123-bf59-b69b1d3ed06c]
[2020-06-12 17:39:11,563: WARNING/ForkPoolWorker-8] --- MAIN 4 140072813068960 {‘A_DATA 4 241‘: 0}
[2020-06-12 17:39:11,564: WARNING/ForkPoolWorker-1] --- MAIN 5 140072813068960 {‘A_DATA 5 108‘: 0}
[2020-06-12 17:39:11,564: WARNING/ForkPoolWorker-8] === MAIN 4 140072813219184 {‘A_DATA 4 101‘: 0}
[2020-06-12 17:39:11,564: WARNING/ForkPoolWorker-1] === MAIN 5 140072813219184 {‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,577: INFO/MainProcess] Received task: tasks.main[9251870c-67e6-45e8-bedb-9e18a7adee61]
[2020-06-12 17:39:11,579: WARNING/ForkPoolWorker-7] --- MAIN 6 140072813068960 {‘A_DATA 6 250‘: 0}
[2020-06-12 17:39:11,579: WARNING/ForkPoolWorker-7] === MAIN 6 140072813219184 {‘A_DATA 6 558‘: 0}
[2020-06-12 17:39:11,581: INFO/MainProcess] Received task: tasks.main[8fb18613-5475-4ba8-877f-99ec7b34ae0e]
[2020-06-12 17:39:11,583: WARNING/ForkPoolWorker-5] --- MAIN 7 140072813068960 {‘A_DATA 7 766‘: 0}
[2020-06-12 17:39:11,583: WARNING/ForkPoolWorker-5] === MAIN 7 140072813219184 {‘A_DATA 7 392‘: 0}
[2020-06-12 17:39:11,593: INFO/MainProcess] Received task: tasks.main[87d5ad4d-3e08-4f37-a0a8-f41737ac16e3]
[2020-06-12 17:39:11,600: INFO/MainProcess] Received task: tasks.main[01263f6c-1840-4270-9265-9351454ff410]
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-4] --- MAIN 8 140072813068960 {‘A_DATA 8 204‘: 0}
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-9] --- MAIN 9 140072813068960 {‘A_DATA 9 350‘: 0}
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-4] === MAIN 8 140072813219184 {‘A_DATA 8 676‘: 0}
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-9] === MAIN 9 140072813219184 {‘A_DATA 9 996‘: 0}
[2020-06-12 17:39:11,680: INFO/MainProcess] Received task: recv.task_node[520f917f-2692-43de-ac72-e0a350980ff0]
[2020-06-12 17:39:11,681: INFO/ForkPoolWorker-3] Task tasks.main[281a82a5-57e3-4442-80ea-6b5218871b98] succeeded in 0.135809466243s: None
[2020-06-12 17:39:11,683: INFO/MainProcess] Received task: recv.task_node[c22c353d-abfe-48eb-b023-50211f35854f]
[2020-06-12 17:39:11,685: INFO/ForkPoolWorker-2] Task tasks.main[ef098266-0cab-438e-ba6a-db3ee486c0da] succeeded in 0.137533180416s: None
[2020-06-12 17:39:11,686: INFO/MainProcess] Received task: recv.task_node[d9fd63ff-3058-43e4-ba6e-c1fa92e50bbe]
[2020-06-12 17:39:11,690: INFO/ForkPoolWorker-6] Task tasks.main[152afd1e-92c1-4738-8602-bde7d29f8ce4] succeeded in 0.140443377197s: None
[2020-06-12 17:39:11,691: INFO/ForkPoolWorker-10] Task tasks.main[f15c3b32-5a70-479d-a622-addd0f03330c] succeeded in 0.156630814075s: None
[2020-06-12 17:39:11,691: WARNING/ForkPoolWorker-3] --- NODE 1 140072813068960 {‘A_DATA 1 61‘: 0}
[2020-06-12 17:39:11,692: WARNING/ForkPoolWorker-3] === NODE 1 140072813219184 {‘A_DATA 1 85‘: 0}
[2020-06-12 17:39:11,692: WARNING/ForkPoolWorker-3] --- recv 1 140072813068960 {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0}
[2020-06-12 17:39:11,692: WARNING/ForkPoolWorker-3] === recv 1 140072813219184 {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0}
[2020-06-12 17:39:11,692: INFO/MainProcess] Received task: recv.task_node[7fd20514-f293-493c-9d67-0526cc38e85a]
[2020-06-12 17:39:11,692: INFO/ForkPoolWorker-3] Task recv.task_node[520f917f-2692-43de-ac72-e0a350980ff0] succeeded in 0.000800415873528s: None
[2020-06-12 17:39:11,696: WARNING/ForkPoolWorker-2] --- NODE 2 140072813068960 {‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,696: WARNING/ForkPoolWorker-2] === NODE 2 140072813219184 {‘A_DATA 2 802‘: 0}
[2020-06-12 17:39:11,697: WARNING/ForkPoolWorker-2] --- recv 2 140072813068960 {‘B_DATA 2 81‘: 0, ‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,697: WARNING/ForkPoolWorker-2] === recv 2 140072813219184 {‘A_DATA 2 802‘: 0, ‘B_DATA 2 714‘: 0}
[2020-06-12 17:39:11,697: INFO/ForkPoolWorker-2] Task recv.task_node[c22c353d-abfe-48eb-b023-50211f35854f] succeeded in 0.000871725380421s: None
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-3] --- NODE 3 140072813068960 {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0}
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-3] === NODE 3 140072813219184 {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0}
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-3] --- recv 3 140072813068960 {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0, ‘B_DATA 3 7‘: 0}
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-10] --- NODE 0 140072813068960 {‘A_DATA 0 243‘: 0}
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-3] === recv 3 140072813219184 {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0, ‘B_DATA 3 383‘: 0}
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-10] === NODE 0 140072813219184 {‘A_DATA 0 711‘: 0}
[2020-06-12 17:39:11,700: INFO/ForkPoolWorker-3] Task recv.task_node[7fd20514-f293-493c-9d67-0526cc38e85a] succeeded in 0.000573091208935s: None
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-10] --- recv 0 140072813068960 {‘A_DATA 0 243‘: 0, ‘B_DATA 0 345‘: 0}
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-10] === recv 0 140072813219184 {‘B_DATA 0 392‘: 0, ‘A_DATA 0 711‘: 0}
[2020-06-12 17:39:11,700: INFO/ForkPoolWorker-10] Task recv.task_node[d9fd63ff-3058-43e4-ba6e-c1fa92e50bbe] succeeded in 0.000787496566772s: None
[2020-06-12 17:39:11,700: INFO/ForkPoolWorker-1] Task tasks.main[444cbb4e-7af2-4123-bf59-b69b1d3ed06c] succeeded in 0.1372801736s: None
[2020-06-12 17:39:11,702: INFO/MainProcess] Received task: recv.task_node[adf4ab3e-0930-43a7-a1f9-2b5a2d45556c]
[2020-06-12 17:39:11,704: WARNING/ForkPoolWorker-1] --- NODE 5 140072813068960 {‘A_DATA 5 108‘: 0}
[2020-06-12 17:39:11,704: WARNING/ForkPoolWorker-1] === NODE 5 140072813219184 {‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,705: WARNING/ForkPoolWorker-1] --- recv 5 140072813068960 {‘A_DATA 5 108‘: 0, ‘B_DATA 5 46‘: 0}
[2020-06-12 17:39:11,705: WARNING/ForkPoolWorker-1] === recv 5 140072813219184 {‘B_DATA 5 134‘: 0, ‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,705: INFO/ForkPoolWorker-1] Task recv.task_node[adf4ab3e-0930-43a7-a1f9-2b5a2d45556c] succeeded in 0.00115340203047s: None
[2020-06-12 17:39:11,706: INFO/ForkPoolWorker-5] Task tasks.main[8fb18613-5475-4ba8-877f-99ec7b34ae0e] succeeded in 0.123670294881s: None
[2020-06-12 17:39:11,706: INFO/ForkPoolWorker-8] Task tasks.main[de16ad35-bde8-41ed-a910-8eb8e9dff486] succeeded in 0.143696308136s: None
[2020-06-12 17:39:11,708: INFO/MainProcess] Received task: recv.task_node[4f58e5a2-da57-40d9-8630-dbad91fe9247]
[2020-06-12 17:39:11,711: INFO/MainProcess] Received task: recv.task_node[5a5fb2be-6c0d-46aa-bd99-2a22270b89ee]
[2020-06-12 17:39:11,711: WARNING/ForkPoolWorker-3] --- NODE 7 140072813068960 {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0, ‘B_DATA 3 7‘: 0}
[2020-06-12 17:39:11,712: WARNING/ForkPoolWorker-3] === NODE 7 140072813219184 {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0, ‘B_DATA 3 383‘: 0}
[2020-06-12 17:39:11,712: WARNING/ForkPoolWorker-3] --- recv 7 140072813068960 {‘B_DATA 7 399‘: 0, ‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0, ‘B_DATA 3 7‘: 0}
[2020-06-12 17:39:11,712: WARNING/ForkPoolWorker-3] === recv 7 140072813219184 {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0, ‘B_DATA 3 383‘: 0, ‘B_DATA 7 142‘: 0}
[2020-06-12 17:39:11,712: INFO/ForkPoolWorker-3] Task recv.task_node[4f58e5a2-da57-40d9-8630-dbad91fe9247] succeeded in 0.000611014664173s: None
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] --- NODE 4 140072813068960 {‘B_DATA 2 81‘: 0, ‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] === NODE 4 140072813219184 {‘A_DATA 2 802‘: 0, ‘B_DATA 2 714‘: 0}
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] --- recv 4 140072813068960 {‘B_DATA 2 81‘: 0, ‘A_DATA 2 521‘: 0, ‘B_DATA 4 25‘: 0}
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] === recv 4 140072813219184 {‘A_DATA 2 802‘: 0, ‘B_DATA 4 744‘: 0, ‘B_DATA 2 714‘: 0}
[2020-06-12 17:39:11,713: INFO/ForkPoolWorker-9] Task tasks.main[01263f6c-1840-4270-9265-9351454ff410] succeeded in 0.112046726048s: None
[2020-06-12 17:39:11,713: INFO/ForkPoolWorker-2] Task recv.task_node[5a5fb2be-6c0d-46aa-bd99-2a22270b89ee] succeeded in 0.000574573874474s: None
[2020-06-12 17:39:11,716: INFO/ForkPoolWorker-7] Task tasks.main[9251870c-67e6-45e8-bedb-9e18a7adee61] succeeded in 0.137739785016s: None
[2020-06-12 17:39:11,716: INFO/MainProcess] Received task: recv.task_node[c5c172f5-bc96-4ef1-83ec-a054fb461076]
[2020-06-12 17:39:11,717: INFO/ForkPoolWorker-4] Task tasks.main[87d5ad4d-3e08-4f37-a0a8-f41737ac16e3] succeeded in 0.11572509259s: None
[2020-06-12 17:39:11,717: WARNING/ForkPoolWorker-5] --- NODE 9 140072813068960 {‘A_DATA 7 766‘: 0}
[2020-06-12 17:39:11,717: WARNING/ForkPoolWorker-5] === NODE 9 140072813219184 {‘A_DATA 7 392‘: 0}
[2020-06-12 17:39:11,718: WARNING/ForkPoolWorker-5] --- recv 9 140072813068960 {‘A_DATA 7 766‘: 0, ‘B_DATA 9 5‘: 0}
[2020-06-12 17:39:11,718: WARNING/ForkPoolWorker-5] === recv 9 140072813219184 {‘B_DATA 9 30‘: 0, ‘A_DATA 7 392‘: 0}
[2020-06-12 17:39:11,718: INFO/ForkPoolWorker-5] Task recv.task_node[c5c172f5-bc96-4ef1-83ec-a054fb461076] succeeded in 0.000654295086861s: None
[2020-06-12 17:39:11,719: INFO/MainProcess] Received task: recv.task_node[26820268-5d73-4127-abb6-5acf4e1cd516]
[2020-06-12 17:39:11,721: INFO/MainProcess] Received task: recv.task_node[121cb307-fbcd-44d5-a826-9244a253b468]
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] --- NODE 6 140072813068960 {‘A_DATA 5 108‘: 0, ‘B_DATA 5 46‘: 0}
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] === NODE 6 140072813219184 {‘B_DATA 5 134‘: 0, ‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] --- recv 6 140072813068960 {‘B_DATA 6 220‘: 0, ‘A_DATA 5 108‘: 0, ‘B_DATA 5 46‘: 0}
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] === recv 6 140072813219184 {‘B_DATA 6 540‘: 0, ‘B_DATA 5 134‘: 0, ‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,722: INFO/ForkPoolWorker-1] Task recv.task_node[26820268-5d73-4127-abb6-5acf4e1cd516] succeeded in 0.00059811770916s: None
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] --- NODE 8 140072813068960 {‘A_DATA 9 350‘: 0}
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] === NODE 8 140072813219184 {‘A_DATA 9 996‘: 0}
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] --- recv 8 140072813068960 {‘B_DATA 8 166‘: 0, ‘A_DATA 9 350‘: 0}
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] === recv 8 140072813219184 {‘A_DATA 9 996‘: 0, ‘B_DATA 8 273‘: 0}
[2020-06-12 17:39:11,724: INFO/ForkPoolWorker-9] Task recv.task_node[121cb307-fbcd-44d5-a826-9244a253b468] succeeded in 0.000640526413918s: None
从数据上来说,在开启proform的模式下(多进程),c =1(数量为1),第一个进程赋予变量?my_data的值在第二个进程中还能使用。
测试数据的环境是python2 + celery 3.2.1,会在3-5秒的时间内重置内存里的变量,但是地址是不变的。
在python3和celery4的情况,干脆就不重置了,在设置 my_data[random.randint(0, 1000)]=0
? 的情况下,完成一个任务,my_data多一个键值,意味着内存里的变量是不重置的。
0X03 画个图
A生产数据,B消费再生产,C消费的时候调用了D。
B改动了E,再发送任务给broker。C再读取E,能拿到B之前写入E的数据(不是通过broker,而是存在内存中的)。
所以,这个现象说明了,多个进程之间的变量用的是同一个内存,会互相重置导致错乱么?
0X03 多进程串数据?
设置中CELERYD_MAX_TASKS_PER_CHILD=7? # 每个worker执行了多少任务
启动时preform -c 3
其实运行的模型长这样
进程1 2 3之间的数据是不互相影响的。
但是在一个进程中的任务1-7使用的是相同的内存。所以在任务1中,B改了数据E;任务2中,C再读取E,就能读到数据。
虽然从理论上来说,B生产任务发送给C,并没有E的数据,E不应该拿到。但是因为worker未销毁,变量的内存不变,所以就拿到了,同一个进程中的所有任务共享了。
会引起串数据的问题么?不会。因为任务1-7其实是串联执行的。
一个进程在执行任务运行C的时候,并不会执行其他任务,也不会有一个B或者一个C来修改掉E的数据。
所以多进程不会串数据。
0X04 协程/多线程串数据?
1) 协程与多线程是会串数据的。
以多线程为例,执行一个进程,进程中有n个线程,共用一处内存,存储在内存中的变量随时会被改来改去的。
想要在C设置一个数据,由调用的末端D拿到,如果只是放在一个变量中,随时可能会被其他线程拿到,也不能用锁吧。
这种情况,设置
dict = {
"一个任务的标识": "一个任务想要的数据",
"一个任务的标识": "一个任务想要的数据"
}
2) 标识上
os.getpid() 进程ID大家都是一样的;
但是线程ID是不一样的,任务的标识可以用线程ID;
print(threading.currentThread().name)
print(threading.currentThread().ident)
也可以用task.id,任务id在多进程、多线程、多协程的情况下都是可以使用的,不用考虑太多。
3) task id的调用:
recv2.py(步骤D)
from data_type import data_info, change_add, my_data
def recv2(param):
from recv import task_node
print(task_node.request)
print(task_node.request.id)
print("--- recv " + param + " " + str(id(data_info)) + " " + str(data_info))
print("=== recv " + param + " " + str(id(my_data)) + " " + str(my_data))
<Context: {
‘lang‘: ‘py‘,
‘task‘: ‘recv.task_node‘,
‘id‘: ‘1f4c9e66-3673-4163-a8f7-2a5a8418f742‘,
‘shadow‘: None,
‘eta‘: None,
‘expires‘: None,
‘group‘: None,
‘retries‘: 0,
‘timelimit‘: [None, None],
‘root_id‘: ‘f804e984-6e76-4b9d-98fb-6fe3f6bb935f‘,
‘parent_id‘: ‘f804e984-6e76-4b9d-98fb-6fe3f6bb935f‘,
‘argsrepr‘: "[‘2‘]",
‘kwargsrepr‘: ‘{}‘,
‘origin‘: ‘gen579559@127.0.0.1‘,
‘reply_to‘: ‘7c0a805f-de11-36bc-b5f4-f6bd1f5ea5e4‘,
‘correlation_id‘: ‘1f4c9e66-3673-4163-a8f7-2a5a8418f742‘,
‘hostname‘: ‘celery@127.0.0.1‘,
‘delivery_info‘: {
?‘exchange‘: ‘‘,
?‘routing_key‘: ‘Aefault‘,
?‘priority‘: 0,
?‘redelivered‘: None
},
‘args‘: [‘2‘],
‘kwargs‘: {},
‘is_eager‘: False,
‘callbacks‘: None,
‘errbacks‘: None,
‘chain‘: None,
‘chord‘: None,
‘called_directly‘: False,
‘_protected‘: 1
}>
4)内存泄露
因为多线程用task.id或者线程id做标识,变量会越变越大,所以理论上应该设置?CELERYD_MAX_TASKS_PER_CHILD = 100 / 1000,执行固定数量的任务后销毁重启。
以上是关于CELERY 内存中的变量串数据的主要内容,如果未能解决你的问题,请参考以下文章