使用 Pickle 和 filelock 在多线程中加载和转储文件 - IOError:[Errno 13]
Posted
技术标签:
【中文标题】使用 Pickle 和 filelock 在多线程中加载和转储文件 - IOError:[Errno 13]【英文标题】:Load and dump to file in multithreading using Pickle and filelock - IOError: [Errno 13] 【发布时间】:2017-12-10 09:51:54 【问题描述】:我有一项服务,它使用 python 2.7 cPickle
从 python dict
加载和转储数据到文件中。该服务可以被多个用户同时调用。
什么方法可以允许cPickle
在多线程上下文中读取数据并将其转储到单个文件中,以避免在操作期间数据不同步(在另一个进程转储时加载)问题?
我正在考虑使用filelock,但我还没有成功。
使用下面的代码,文件在init_cache()
或update_cache()
中总是有cPickle.load(cache_file)IOError: [Errno 13] Permission denied"
错误
''' example of a dict dumped by pickle
"version": "1499180895",
"queries":
"001::id,name,age" : "aBase64EncodedString==",
"002::id,name,sex" : "anotherBase64EncodedString=="
'''
import cPickle as pickle
import filelock
from os import path
self.cache_file_path = "\\\\serverDisk\\cache\\cache.pkl"
self.select_by_values = "001"
self.out_fields = ["id", "name", "age"]
def get_from_cache_fn(self):
try:
server_version = self.query_version()
query_id = "::".format(self.select_by_values, ",".join(self.out_fields))
if path.isfile(self.cache_file_path):
cache_dict = self.load_cache(server_version, query_id)
if cache_dict["version"] == server_version:
if query_id in cache_dict["queries"]:
return cache_dict["queries"][query_id]
else:
return self.update_cache(cache_dict, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
except Exception:
self.add_service_error(ERRORS["get_from_cache"][0], traceback.format_exc())
def load_cache(self, server_version, query_id):
with open(self.cache_file_path, "rb") as cache_file:
try:
cache_dict = pickle.load(cache_file)
return cache_dict
except StandardError:
return self.init_cache(server_version, query_id)
def init_cache(self, server_version, query_id):
cache_dict =
"version" : server_version,
"queries" :
query_id : base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout=10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("init_cache timeout", traceback.format_exc())
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())
【问题讨论】:
【参考方案1】:根据 Filelock 文档,您应该将 lock.acquire 包装在 tryexcept
中。否则,当您的获取超时时,它可能会使您的应用程序因未处理的异常而崩溃。见https://pypi.python.org/pypi/filelock
【讨论】:
感谢您指出这一点,但我仍然有同样的错误! 也许库中存在一些错误,让 2 个线程同时获取锁?我不知道可能是什么问题。【参考方案2】:我找到了解决问题的方法。
看来您必须提供与您正在打开的文件不同的锁名称。
lock = filelock.FileLock(".lock".format(self.cache_file_path))
而不是lock = filelock.FileLock(self.cache_file_path)
例如:
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = lock = filelock.FileLock(".lock".format(self.cache_file_path))
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())
【讨论】:
以上是关于使用 Pickle 和 filelock 在多线程中加载和转储文件 - IOError:[Errno 13]的主要内容,如果未能解决你的问题,请参考以下文章
Day388.Selector&Pipe&fileLock文件锁&Path&Files&AsynchronousFileChannel异步通道 -NIO(代码片
可以在多线程环境中使用单个 QueueConnection 吗?