如何在使用多处理时将数据添加到 json 文件?
Posted
技术标签:
【中文标题】如何在使用多处理时将数据添加到 json 文件?【英文标题】:How to add data to a json file while making use of multiproccesing? 【发布时间】:2021-07-12 07:50:10 【问题描述】:我正在使用一个用户友好的基于 json 的面向文档的数据库,名为 TinyDB。但是我无法将多条数据添加到我的数据库中,因为我正在使用多处理。过了一会儿,我得到了数据库中已经存在 id x 的错误(这是因为 2 个或更多进程正在尝试同时添加数据)。有没有办法解决这个问题?
每次运行我都会插入新的唯一参数。
示例参数:
params = 'id' = 1, 'name': 'poop', 'age': 99
代码:
resultsDb = TinyDB('db/resultsDb.json')
def run(params):
resultsDb.insert('id': params['id'], 'name': params['name'], 'age': params['age'])
maxProcesses = 12 # Cores in my pc
for i in range(maxProcesses):
processes.append(Process(target=run, args=(params,)))
for p in processes:
p.start()
for p in processes:
p.join()
【问题讨论】:
首先,您在params['id]
中缺少'
。其次,您的代码将相同的 params
值(您没有显示)传递给所有 12 个进程。这真的是您的实际代码吗?如果是这样,即使您没有使用多处理,您当然也会收到重复的 id 错误。或者你有参数列表吗?我可以假设这是 Linux/Unix 吗?
好的,我已经更新了。不,这不是我的实际代码,使它不那么困难。我添加了一个示例 params dict。是的,我正在使用 Linux。
你真的看过Why Not Use TinyDB?
查看@HTF 的评论,然后在下面查看我的答案,该答案验证了多处理和多线程与TinyDB
不兼容。我让它工作,但不得不使用Lock
序列化插入,这违背了多处理的整个目的。
【参考方案1】:
如果您想并行化数据写入,则需要将问题分解为更小的步骤,以便确保插入数据的步骤已经将所有内容合并在一起。这样你就不会在写入时遇到任何(明显的)线程安全问题。
例如,假设您的 JSON 文件具有三个字段,email
、name
和 age
,并且您希望对 email
强制唯一性,但有些记录是双输入的。例如,有一个条目有emily@smith.com
和她的名字,另一个有她的年龄。
您首先要制作一些东西来将所有内容组合在一起,然后并行化写入。
我会画一些代码(注意我没有测试过这个!):
my_data = # some JSON data
grouped =
for datum in my_data:
if datum['email'] in grouped:
grouped[datum['email']].update(datum)
else:
grouped[datum['email']] = datum
# parallelize write as above
【讨论】:
我猜问题不在于并行化。所以 tinyDB 给每个数据条目一个唯一的“id”(其余的值不是唯一的),但是因为我使用多个进程,有时他会尝试同时添加数据。这导致两个进程都想添加一个 id 为 5 的数据条目,但最后一个进程收到错误,因为这个 id 已经存在。【参考方案2】:我无法在我有权访问的 Linux 系统上对此进行测试,因为它是一个共享服务器,运行代码所需的某些设施已被禁止访问。这是下面的 Windows 版本。但主要特点是:
-
它使用
Lock
来确保插入是序列化的,我认为这对于它运行而不会出错是必要的。当然,这违背了将代码并行化的目的,并且可以得出结论,使用多处理或多线程确实没有意义。
在 Windows 中,我不必将 resultsDb = TinyDB('db.json')
语句移动到 run
函数内,因为在 spawn
用于创建新进程的平台上,例如 Windows,如果我将该语句留在全局范围内无论如何,它都会为每个新创建的进程执行。然而,对于 Linux,fork
用于创建新进程,它不会为每个新进程执行,而是每个新进程将继承由主进程打开的单个数据库。这可能有效,也可能无效——您可以在全局范围内尝试两种方式的语句。如果您将其放回全局范围以查看它是否在那里工作,您不需要在源代码的底部进行相同的声明。
from tinydb import TinyDB
from multiprocessing import Process, Lock
def run(lock, params):
resultsDb = TinyDB('db/resultsDb.json')
with lock:
resultsDb.insert('id': params['id'], 'name': params['name'], 'age': params['age'])
print('Successfully inserted.')
# required by Windows:
if __name__ == '__main__':
params = 'id': 1, 'name': 'poop', 'age': 99
maxProcesses = 12 # Cores in my pc
lock = Lock()
processes = []
for i in range(maxProcesses):
processes.append(Process(target=run, args=(lock, params)))
for p in processes:
p.start()
for p in processes:
p.join()
# remove the following if the first one is at global scope:
resultsDb = TinyDB('db/resultsDb.json')
print(resultsDb.all())
打印:
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
['id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99]
【讨论】:
尽管有我的描述,我实际上将resultsDb = TinyDB('db/resultsDb.json')
作为一个全局声明,当我打算将它移动到函数 run
中时(正如我所说,它在 Windows 上双向工作,我对它进行了双向测试)。我已经更新了源代码,使其符合我上面的描述。以上是关于如何在使用多处理时将数据添加到 json 文件?的主要内容,如果未能解决你的问题,请参考以下文章
如何在节点js中向azure服务总线队列发送消息时将内容类型指定为application/json?
处理来自服务器的 JSON 数据并添加/更新 CoreData 中的对象