使用 Pickle 和 filelock 在多线程中加载和转储文件 - IOError:[Errno 13]

Posted

技术标签:

【中文标题】使用 Pickle 和 filelock 在多线程中加载和转储文件 - IOError:[Errno 13]【英文标题】:Load and dump to file in multithreading using Pickle and filelock - IOError: [Errno 13] 【发布时间】:2017-12-10 09:51:54 【问题描述】:

我有一项服务,它使用 python 2.7 cPickle 从 python dict 加载和转储数据到文件中。该服务可以被多个用户同时调用。

什么方法可以允许cPickle 在多线程上下文中读取数据并将其转储到单个文件中,以避免在操作期间数据不同步(在另一个进程转储时加载)问题?

我正在考虑使用filelock,但我还没有成功。

使用下面的代码,文件在init_cache()update_cache() 中总是有cPickle.load(cache_file)IOError: [Errno 13] Permission denied" 错误

''' example of a dict dumped by pickle

   
     "version": "1499180895", 
     "queries":  
         "001::id,name,age" : "aBase64EncodedString==",
         "002::id,name,sex" : "anotherBase64EncodedString=="
      
   

'''


import cPickle as pickle
import filelock
from os import path

self.cache_file_path = "\\\\serverDisk\\cache\\cache.pkl"
self.select_by_values = "001"
self.out_fields = ["id", "name", "age"]

def get_from_cache_fn(self):
    try:
        server_version = self.query_version()
        query_id = "::".format(self.select_by_values, ",".join(self.out_fields))
        if path.isfile(self.cache_file_path):
            cache_dict = self.load_cache(server_version, query_id)
            if cache_dict["version"] == server_version:
                if query_id in cache_dict["queries"]:
                     return cache_dict["queries"][query_id]
                else:
                    return self.update_cache(cache_dict, query_id)["queries"][query_id]
            else:
                return self.init_cache(server_version, query_id)["queries"][query_id]
        else:
            return self.init_cache(server_version, query_id)["queries"][query_id]
    except Exception:
        self.add_service_error(ERRORS["get_from_cache"][0], traceback.format_exc())


def load_cache(self, server_version, query_id):
    with open(self.cache_file_path, "rb") as cache_file:
        try:
            cache_dict = pickle.load(cache_file)
            return cache_dict
        except StandardError:
            return self.init_cache(server_version, query_id)


def init_cache(self, server_version, query_id):
    cache_dict = 
        "version" : server_version,
        "queries" : 
            query_id : base64.b64encode(zlib.compress(json.dumps(self.query_features())))
        
    
    lock = filelock.FileLock(self.cache_file_path)
    try:
        with lock.acquire(timeout=10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("init_cache timeout", traceback.format_exc())


def update_cache(self, cache_dict, query_id):
    cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
    lock = filelock.FileLock(self.cache_file_path)
    try:
        with lock.acquire(timeout = 10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("update_cache timeout", traceback.format_exc())

【问题讨论】:

【参考方案1】:

根据 Filelock 文档,您应该将 lock.acquire 包装在 tryexcept 中。否则,当您的获取超时时,它可能会使您的应用程序因未处理的异常而崩溃。见https://pypi.python.org/pypi/filelock

【讨论】:

感谢您指出这一点,但我仍然有同样的错误! 也许库中存在一些错误,让 2 个线程同时获取锁?我不知道可能是什么问题。【参考方案2】:

我找到了解决问题的方法。

看来您必须提供与您正在打开的文件不同的锁名称。

lock = filelock.FileLock(".lock".format(self.cache_file_path)) 而不是lock = filelock.FileLock(self.cache_file_path)

例如:

def update_cache(self, cache_dict, query_id):
    cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
    lock = lock = filelock.FileLock(".lock".format(self.cache_file_path))
    try:
        with lock.acquire(timeout = 10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("update_cache timeout", traceback.format_exc())

【讨论】:

以上是关于使用 Pickle 和 filelock 在多线程中加载和转储文件 - IOError:[Errno 13]的主要内容,如果未能解决你的问题,请参考以下文章

Day388.Selector&Pipe&fileLock文件锁&Path&Files&AsynchronousFileChannel异步通道 -NIO(代码片

在多线程中使用静态方法是否有线程安全问题

在多线程应用程序中使用 opencv waitKey()

可以在多线程环境中使用单个 QueueConnection 吗?

利刃 MVVMLight 8:DispatchHelper在多线程和调度中的使用

如何在多线程环境中使用嵌入式 MySQL?