如何在使用多处理时将数据添加到 json 文件?

Posted

技术标签:

【中文标题】如何在使用多处理时将数据添加到 json 文件?【英文标题】:How to add data to a json file while making use of multiproccesing? 【发布时间】:2021-07-12 07:50:10 【问题描述】:

我正在使用一个用户友好的基于 json 的面向文档的数据库,名为 TinyDB。但是我无法将多条数据添加到我的数据库中,因为我正在使用多处理。过了一会儿,我得到了数据库中已经存在 id x 的错误(这是因为 2 个或更多进程正在尝试同时添加数据)。有没有办法解决这个问题?

每次运行我都会插入新的唯一参数。

示例参数:

params = 'id' = 1, 'name': 'poop', 'age': 99

代码:

resultsDb = TinyDB('db/resultsDb.json')

def run(params):
    resultsDb.insert('id': params['id'], 'name': params['name'], 'age': params['age'])

maxProcesses = 12 # Cores in my pc

for i in range(maxProcesses):
    processes.append(Process(target=run, args=(params,)))

for p in processes:
    p.start()

for p in processes:
    p.join()

【问题讨论】:

首先,您在params['id] 中缺少'。其次,您的代码将相同的 params 值(您没有显示)传递给所有 12 个进程。这真的是您的实际代码吗?如果是这样,即使您没有使用多处理,您当然也会收到重复的 id 错误。或者你有参数列表吗?我可以假设这是 Linux/Unix 吗? 好的,我已经更新了。不,这不是我的实际代码,使它不那么困难。我添加了一个示例 params dict。是的,我正在使用 Linux。 你真的看过Why Not Use TinyDB? 查看@HTF 的评论,然后在下面查看我的答案,该答案验证了多处理和多线程与TinyDB 不兼容。我让它工作,但不得不使用Lock 序列化插入,这违背了多处理的整个目的。 【参考方案1】:

如果您想并行化数据写入,则需要将问题分解为更小的步骤,以便确保插入数据的步骤已经将所有内容合并在一起。这样你就不会在写入时遇到任何(明显的)线程安全问题。

例如,假设您的 JSON 文件具有三个字段,emailnameage,并且您希望对 email 强制唯一性,但有些记录是双输入的。例如,有一个条目有emily@smith.com 和她的名字,另一个有她的年龄。

您首先要制作一些东西来将所有内容组合在一起,然后并行化写入。

我会画一些代码(注意我没有测试过这个!):

my_data = # some JSON data
grouped = 

for datum in my_data:
    if datum['email'] in grouped:
        grouped[datum['email']].update(datum)
    else:
        grouped[datum['email']] = datum

# parallelize write as above

【讨论】:

我猜问题不在于并行化。所以 tinyDB 给每个数据条目一个唯一的“id”(其余的值不是唯一的),但是因为我使用多个进程,有时他会尝试同时添加数据。这导致两个进程都想添加一个 id 为 5 的数据条目,但最后一个进程收到错误,因为这个 id 已经存在。【参考方案2】:

我无法在我有权访问的 Linux 系统上对此进行测试,因为它是一个共享服务器,运行代码所需的某些设施已被禁止访问。这是下面的 Windows 版本。但主要特点是:

    它使用Lock 来确保插入是序列化的,我认为这对于它运行而不会出错是必要的。当然,这违背了将代码并行化的目的,并且可以得出结论,使用多处理或多线程确实没有意义。 在 Windows 中,我不必将 resultsDb = TinyDB('db.json') 语句移动到 run 函数内,因为在 spawn 用于创建新进程的平台上,例如 Windows,如果我将该语句留在全局范围内无论如何,它都会为每个新创建的进程执行。然而,对于 Linux,fork 用于创建新进程,它不会为每个新进程执行,而是每个新进程将继承由主进程打开的单个数据库。这可能有效,也可能无效——您可以在全局范围内尝试两种方式的语句。如果您将其放回全局范围以查看它是否在那里工作,您不需要在源代码的底部进行相同的声明。
from tinydb import TinyDB
from multiprocessing import Process, Lock


def run(lock, params):
    resultsDb = TinyDB('db/resultsDb.json')
    with lock:
        resultsDb.insert('id': params['id'], 'name': params['name'], 'age': params['age'])
    print('Successfully inserted.')

# required by Windows:
if __name__ == '__main__':
    params = 'id': 1, 'name': 'poop', 'age': 99

    maxProcesses = 12 # Cores in my pc

    lock = Lock()
    processes = []
    for i in range(maxProcesses):
        processes.append(Process(target=run, args=(lock, params)))

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    # remove the following if the first one is at global scope:
    resultsDb = TinyDB('db/resultsDb.json')
    print(resultsDb.all())

打印:

Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
['id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99, 'id': 1, 'name': 'poop', 'age': 99]

【讨论】:

尽管有我的描述,我实际上将 resultsDb = TinyDB('db/resultsDb.json') 作为一个全局声明,当我打算将它移动到函数 run 中时(正如我所说,它在 Windows 上双向工作,我对它进行了双向测试)。我已经更新了源代码,使其符合我上面的描述。

以上是关于如何在使用多处理时将数据添加到 json 文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在节点js中向azure服务总线队列发送消息时将内容类型指定为application/json?

如何在.NET 运行时将文件夹添加到程序集搜索路径?

处理来自服务器的 JSON 数据并添加/更新 CoreData 中的对象

快速点击按钮时将单元格添加到表格视图

如何在使用“delims =”作为扩展列表时将搜索目录包含到 FOR-DO 循环中

如何在通过 log4j 创建新日志文件时将日志添加到 Syslog?