拆分大型 json 文件并将每个部分分配给一个进程
Posted
技术标签:
【中文标题】拆分大型 json 文件并将每个部分分配给一个进程【英文标题】:Split large json file and assign each part to a process 【发布时间】:2021-12-31 15:19:27 【问题描述】:我正在尝试优化具有多个进程的大型 JSON 数据集 ~ 14 GiB(1200 万行),以便我可以更快。我创建了两个multiprocessing.Queue
实例,in_q
和out_q
。他们将读取data-2021-09-29.jsonl
文件中的数据(它是数据集),并输出包含我感兴趣的数据的行并将其写入另一个文件stokes_DE_file.jsonl
(这是我正在改进的部分数据集,输出文件是数据集的细化版本)。我的机器上有 16 个 CPU,假设我想将文件分成 16 个部分,行数将是变量 LINES_PER_PROCESS。如何为每个进程分配文件的一部分?以下是我到目前为止编写的代码。
刚从 python 中的多处理模块开始。这是我的代码:
import multiprocessing as mp
import threading
import json
import reverse_geocoder as rg
LINES_PER_PROCESS = 12137928/(mp.cpu_count()-1)
def geocode_worker(in_q, out_q):
while True:
strike = in_q.get()
if strike is None:
out_q.put(None)
return
strike_location = (strike['lat'], strike['lon'])
if rg.search(strike_location)[0]['cc'] == 'DE':
out_q.put('\n'.format(strike))
def file_write_worker(out_q, fileobj, worker_count):
while worker_count:
for msg in iter(out_q.get, None):
if msg is None:
worker_count -= 1
fileobj.write(msg)
def get_germany_strokes(jsonl_file):
worker_count = mp.cpu_count() - 1
in_q, out_q = mp.Queue(), mp.Queue()
processes = [mp.Process(target=geocode_worker, args=(in_q, out_q)) for _ in range(worker_count)]
for p in processes:
p.start()
with open('strokes_DE_file.json', newline='') as strokes_DE_file:
file_writer = threading.Thread(target=file_write_worker, args=(out_q, strokes_DE_file, worker_count))
file_writer.start()
with open(jsonl_file, newline='') as file:
next(file)
for line in file:
strokes = json.loads(line)['strokes']
for strike in strokes:
in_q.put(strike)
get_germany_strokes('data-2021-09-29.jsonl')
【问题讨论】:
您的问题到底是什么?您的问题含糊不清,我们不知道您到底需要什么,但不知道您目前正在尝试解决什么错误/异常/问题。 我的问题是:我如何将文件拆分为 15 个部分并将每个部分分配给一个进程,以便每个进程可以过滤分配给他的数据并将其写入输出文件。 您的数据是 json 列表还是非常大的 json?这很重要。 无论如何您都可以看到:github.com/kashifrazzaqui/json-streamer 它可以帮助您流式传输 json 文件。读取你认为可行的每 N 个对象,然后使用多处理处理这 N 个对象。我还建议使用带有pool.map
变体之一的简单multiprocessing.Pool
实例。
【参考方案1】:
我找到的解决方案是在 Linux 中使用 split -l
命令拆分我的数据集,并指定每个子集的行数。我将每个子集分配给一个进程,总共 15 个进程大约是要创建的最佳进程数(考虑开销/负载平衡)。
【讨论】:
以上是关于拆分大型 json 文件并将每个部分分配给一个进程的主要内容,如果未能解决你的问题,请参考以下文章
在 Nodejs 中解析大型 JSON 文件并独立处理每个对象
如何将 json 输出保存在 String 对象中。重复数据写入所有拆分的文件中?