拆分大型 json 文件并将每个部分分配给一个进程

Posted

技术标签:

【中文标题】拆分大型 json 文件并将每个部分分配给一个进程【英文标题】:Split large json file and assign each part to a process 【发布时间】:2021-12-31 15:19:27 【问题描述】:

我正在尝试优化具有多个进程的大型 JSON 数据集 ~ 14 GiB(1200 万行),以便我可以更快。我创建了两个multiprocessing.Queue 实例,in_qout_q。他们将读取data-2021-09-29.jsonl 文件中的数据(它是数据集),并输出包含我感兴趣的数据的行并将其写入另一个文件stokes_DE_file.jsonl(这是我正在改进的部分数据集,输出文件是数据集的细化版本)。我的机器上有 16 个 CPU,假设我想将文件分成 16 个部分,行数将是变量 LINES_PER_PROCESS。如何为每个进程分配文件的一部分?以下是我到目前为止编写的代码。 刚从 python 中的多处理模块开始。这是我的代码:

import multiprocessing as mp
import threading
import json
import reverse_geocoder as rg

LINES_PER_PROCESS = 12137928/(mp.cpu_count()-1)

def geocode_worker(in_q, out_q):
    while True:
        strike = in_q.get()
        if strike is None:
            out_q.put(None)
            return
        strike_location = (strike['lat'], strike['lon'])
        if rg.search(strike_location)[0]['cc'] == 'DE':
            out_q.put('\n'.format(strike))

def file_write_worker(out_q, fileobj, worker_count):
    while worker_count:
        for msg in iter(out_q.get, None):
            
            if msg is None:
                worker_count -= 1
            fileobj.write(msg)

def get_germany_strokes(jsonl_file):
    
    worker_count = mp.cpu_count() - 1
    in_q, out_q = mp.Queue(), mp.Queue()
    processes = [mp.Process(target=geocode_worker, args=(in_q, out_q)) for _ in range(worker_count)]
    
    for p in processes:
        p.start()
        
    with open('strokes_DE_file.json', newline='') as strokes_DE_file:
        file_writer = threading.Thread(target=file_write_worker, args=(out_q, strokes_DE_file, worker_count))
        file_writer.start()
        
    with open(jsonl_file, newline='') as file:
        next(file)
        for line in file:
            strokes = json.loads(line)['strokes']
            for strike in strokes:
                in_q.put(strike)
                    
get_germany_strokes('data-2021-09-29.jsonl')

【问题讨论】:

您的问题到底是什么?您的问题含糊不清,我们不知道您到底需要什么,但不知道您目前正在尝试解决什么错误/异常/问题。 我的问题是:我如何将文件拆分为 15 个部分并将每个部分分配给一个进程,以便每个进程可以过滤分配给他的数据并将其写入输出文件。 您的数据是 json 列表还是非常大的 json?这很重要。 无论如何您都可以看到:github.com/kashifrazzaqui/json-streamer 它可以帮助您流式传输 json 文件。读取你认为可行的每 N 个对象,然后使用多处理处理这 N 个对象。我还建议使用带有pool.map 变体之一的简单multiprocessing.Pool 实例。 【参考方案1】:

我找到的解决方案是在 Linux 中使用 split -l 命令拆分我的数据集,并指定每个子集的行数。我将每个子集分配给一个进程,总共 15 个进程大约是要创建的最佳进程数(考虑开销/负载平衡)。

【讨论】:

以上是关于拆分大型 json 文件并将每个部分分配给一个进程的主要内容,如果未能解决你的问题,请参考以下文章

嵌套GNU并行处理多个大文件并将每个文件数据拆分为队列

解析大型 JSON 文件 [重复]

在 Nodejs 中解析大型 JSON 文件并独立处理每个对象

如何将 json 输出保存在 String 对象中。重复数据写入所有拆分的文件中?

从 Derby 获取 PreparedStatement 查询

PIG 脚本根据特定单词将大型文本文件拆分为多个部分