如何同时计算大文件中的词频?

Posted

技术标签:

【中文标题】如何同时计算大文件中的词频?【英文标题】:How to count word frequencies in a huge file concurrently? 【发布时间】:2019-07-31 01:31:58 【问题描述】:

我需要统计一个3GB的英文句子gzip压缩纯文本文件的词频,解压后大约30GB。

我有一个带有collections.Countergzip.open 的单线程脚本,需要几个小时才能完成。

由于逐行读取文件比拆分和计数快得多,因此我正在考虑使用文件读取器生成行和多个消费者进行拆分和计数的生产者-消费者流,最后合并Counters 获取单词的出现次数。

但是,我找不到 ProcessPoolExecutor 将队列发送到 Executor 的示例,它们只是列表中的 map 单个项目。 asyncio.Queue 只有单线程示例。

这是一个巨大的文件,所以我无法读取整个文件并在计数之前获得list,因此我无法使用concurrent.futures.Executor.map。但是我阅读的所有示例都使用固定列表作为开始。

拆分和计数一个句子的时间相当于fork一个进程,所以我必须让每个消费者进程的寿命更长。我不认为map 可以合并Counters,所以我不能使用chunksize>1。因此,我必须给消费者一个队列,让他们继续计数,直到整个文件完成。但大多数示例只向消费者发送一件商品,并使用chunksize=1000 减少fork 次。

你能帮我写一个例子吗?

我希望代码向后兼容 Python 3.5.3,因为 PyPy 更快。


我的真实情况是针对更具体的文件格式:

chr1    10011   141     0       157     4       41      50
chr1    10012   146     1       158     4       42      51
chr1    10013   150     0       163     4       43      53
chr1    10014   164     3       167     4       44      54

我需要计算从第 3 列到第 8 列的单列的每个直方图。 所以我以词频为例。

我的代码是:

#!/usr/bin/env pypy3
import sys

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

def main():
    import math

    if len(sys.argv) < 3 :
        print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
        exit(0)
    try:
        verbose = int(sys.argv[3])
    except: # `except IndexError:` and `except ValueError:`
        verbose = 0

    inDepthFile = sys.argv[1]
    outFile = sys.argv[2]
    print('From:[], To:[].\nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
    RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
    for k in SamplesList:
        cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
        cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
        cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2])   # E(X^2)-E(X)^2
    tsvout = open(outFile, 'wt')
    print('#\t'.format('Depth','\t'.join(SamplesList)),file=tsvout)
    #RecordCntLength = len(str(RecordCnt))
    print( '#N=,SD:\t'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
    for depth in range(0,MaxDepth+1):
        print( '\t'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
    tsvout.close()
    pass

def inStat(inDepthFile,verbose):
    import gzip
    import csv
    from collections import Counter
    # Looking up things in global scope takes longer then looking up stuff in local scope. <https://***.com/a/54645851/159695>
    cDepthCnt = key:Counter() for key in SamplesList
    cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
    RecordCnt = 0
    MaxDepth = 0
    with gzip.open(inDepthFile, 'rt') as tsvin:
        tsvin = csv.DictReader(tsvin, delimiter='\t', fieldnames=('ChrID','Pos')+SamplesList )
        try:
            for row in tsvin:
                RecordCnt += 1
                for k in SamplesList:
                    theValue = int(row[k])
                    if theValue > MaxDepth:
                        MaxDepth = theValue
                    cDepthCnt[k][theValue] += 1  # PyPy3:29.82 ns, Python3:30.61 ns
                    cDepthStat[k][0] += theValue
                    cDepthStat[k][1] += theValue * theValue
                #print(MaxDepth,DepthCnt)
        except KeyboardInterrupt:
            print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
            pass
        print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
    main()  # time python3 ./samdepthplot.py t.tsv.gz 1

csv.DictReader 花费的时间最多。


我的问题是,虽然 gzip 阅读器速度很快,但 csv 阅读器速度很快,但我需要计算数十亿行。而且 csv 阅读器肯定比 gzip 阅读器慢。

所以,我需要将行分散到 csv 阅读器的不同工作进程中,并分别进行下游计数。在一个生产者和多个消费者之间使用队列很方便。

由于我使用的是 Python,而不是 C,是否有一些用于多处理和队列的抽象包装器?这可以将ProcessPoolExecutorQueue 类一起使用吗?

【问题讨论】:

我知道你只是简单地提到了你是如何做到这一点的,但是你能把你现在使用的代码包括进来吗? 我想知道您是否仅使用 shell 管道可能无法获得更好的性能?见Command-line Tools can be 235x Faster than your Hadoop Cluster。这个问题听起来很适合xargsuniq -c,也许有一些awk 脚本将它们粘合在一起。 您是否考虑过使用io.BufferedReader?如Reading & Writing GZIP Files Faster in Python 中所述 您可以将 gzip 压缩文件视为一个巨大的随机访问行列表,而无需使用类似于 answer 中正在执行的操作将整个内容读入内存,仅使用 mmap 而不是一个临时文件(我有一个未发布的版本)。然后可以将内存映射连同起始行号和行数一起传递给多个并发子进程。每个子进程可以计算分配给它的部分中的单词,并在完成时传回字典。这些字典可以全部合并在一起。 我现在包含了我的代码。 【参考方案1】:

我从未测试过这段代码,但应该可以。

首先是检查行数

f =('myfile.txt')
def file_len(f):
    with open(f) as f:
        for i, l in enumerate(f):
            pass
    return i + 1
num_lines = file_len(f)

将数据分成n个分区

n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]

现在开始工作:

from multiprocessing import Process
import linecache
jobs = []

for part in range(len(parts)):
    p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
    jobs.append(p)
    p.start()

for p in jobs:
    p.join()

函数示例

def function_here(your_file_name, line_number, split_size):

    for current_line in range(line_number, (line_number+split_size)+1):
        print( linecache.getline(your_file_name, current_line))

不过,在进行任何操作之前,您仍需要检查行数

【讨论】:

我读到 ProcessPoolExecutor 是多处理的简化抽象。这样使用 ProcessPoolExecutor 是否更简单?【参考方案2】:

30 GB 的文本文件足以将您的问题纳入大数据领域。因此,为了解决这个问题,我建议使用 Hadoop 和 Spark 等大数据工具。您所解释的“生产者-消费者流程”基本上是 MapReduce 算法的设计目的。字数频率是一个典型的 MapReduce 问题。查一下,你会发现很多例子。

【讨论】:

【参考方案3】:

这个想法是将大文件分解成更小的文件。调用许多将完成计数工作并返回计数器的工作人员。 最后合并计数器。

from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os

NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10


def slice_huge_file():
    cnt = 0
    with open(INPUT_FILE) as f:
        while True:
            next_n_lines = list(islice(f, NUM_OF_LINES))
            cnt += 1
            if not next_n_lines:
                break
            with open('sub_huge_.txt'.format(cnt), 'w') as out:
                out.writelines(next_n_lines)


def count_file_words(input_file):
    with open(input_file, 'r') as f:
        return Counter([w.strip() for w in f.readlines()])


if __name__ == '__main__':
    slice_huge_file()
    pool = Pool(POOL_SIZE)
    sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
    results = pool.map(count_file_words, sub_files)
    final_counter = Counter()
    for counter in results:
        final_counter += counter
    print(final_counter)

【讨论】:

您不能将文件分割成新文件,而是寻找所需的文件位置并并行处理。将节省大量 IO 上述解决方案对于大文件来说非常耗费资源【参考方案4】:

只是一些伪代码:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import traceback


WORKER_POOL_SIZE = 10  # you should set this as the number of your processes
QUEUE_SIZE = 100       # 10 times to your pool size is good enough


def main():
    with Manager() as manager:
        q = manager.Queue(QUEUE_SIZE)

        # init worker pool
        executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
        workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]

        # start producer
        run_producer(q)

        # wait to done
        for f in workers_pool:
            try:
                f.result()
            except Exception:
                traceback.print_exc()


def run_producer(q):
    try:
        with open("your file path") as fp:
            for line in fp:
                q.put(line)
    except Exception:
        traceback.print_exc()
    finally:
        q.put(None)



def worker(i, q):
    while 1:
        line = q.get()
        if line is None:
            print(f'worker i is done')
            q.put(None)
            return

        # do something with this line
        # ...

【讨论】:

工人完成后如何得到结果?我收到了很多 &lt;Future at 0x1078d3780 state=running&gt; 但没有结果。【参考方案5】:

我在周末学习了多处理库。

Ctrl+C 停止并写入当前结果功能仍然无效。

现在主要功能很好。

#!/usr/bin/env pypy3
import sys
from collections import Counter
from multiprocessing import Pool, Process, Manager, current_process, freeze_support

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

ChunkSize = 1024 * 128
verbose = 0
Nworkers = 16

def main():
    import math

    if len(sys.argv) < 3 :
        print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
        exit(0)
    try:
        verbose = int(sys.argv[3])
    except: # `except IndexError:` and `except ValueError:`
        verbose = 0

    inDepthFile = sys.argv[1]
    outFile = sys.argv[2]
    print('From:[], To:[].\nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
    RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
    for k in SamplesList:
        cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
        cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
        cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2])   # E(X^2)-E(X)^2
    tsvout = open(outFile, 'wt')
    print('#\t'.format('Depth','\t'.join(SamplesList)),file=tsvout)
    #RecordCntLength = len(str(RecordCnt))
    print( '#N=,SD:\t'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
    for depth in range(0,MaxDepth+1):
        #print( '\t'.format(depth,'\t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
        #print( '\t'.format(depth,'\t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
        print( '\t'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
        #pass
    #print('#MaxDepth='.format(MaxDepth),file=tsvout)
    tsvout.close()
    pass

def CallStat(inDepthFile):
    import gzip
    import itertools
    RecordCnt = 0
    MaxDepth = 0
    cDepthCnt = key:Counter() for key in SamplesList
    cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
    #lines_queue = Queue()
    manager = Manager()
    lines_queue = manager.Queue()
    stater_pool = Pool(Nworkers)
    TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
    #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
    #MapResult = stater_pool.map_async(iStator,TASKS,1)
    AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
    try:
        with gzip.open(inDepthFile, 'rt') as tsvfin:
            while True:
                lines = tsvfin.readlines(ChunkSize)
                lines_queue.put(lines)
                if not lines:
                    for i in range(Nworkers):
                        lines_queue.put(b'\n\n')
                    break
    except KeyboardInterrupt:
        print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
        for i in range(Nworkers):
            lines_queue.put(b'\n\n')
        pass
    #for results in ApplyResult:
        #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
    #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
    for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
        RecordCnt += iRecordCnt
        if iMaxDepth > MaxDepth:
            MaxDepth = iMaxDepth
        for k in SamplesList:
            cDepthCnt[k].update(icDepthCnt[k])
            cDepthStat[k][0] += icDepthStat[k][0]
            cDepthStat[k][1] += icDepthStat[k][1]
    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

#def iStator(inQueue,inSamplesList):
def iStator(args):
    (inQueue,inSamplesList) = args
    import csv
    # Looking up things in global scope takes longer then looking up stuff in local scope. <https://***.com/a/54645851/159695>
    cDepthCnt = key:Counter() for key in inSamplesList
    cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
    RecordCnt = 0
    MaxDepth = 0
    for lines in iter(inQueue.get, b'\n\n'):
        try:
            tsvin = csv.DictReader(lines, delimiter='\t', fieldnames=('ChrID','Pos')+inSamplesList )
            for row in tsvin:
                #print(', '.join(row[col] for col in inSamplesList))
                RecordCnt += 1
                for k in inSamplesList:
                    theValue = int(row[k])
                    if theValue > MaxDepth:
                        MaxDepth = theValue
                    #DepthCnt[k][theValue] += 1  # PyPy3:30.54 ns, Python3:22.23 ns
                    #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
                    cDepthCnt[k][theValue] += 1  # PyPy3:29.82 ns, Python3:30.61 ns
                    cDepthStat[k][0] += theValue
                    cDepthStat[k][1] += theValue * theValue
                #print(MaxDepth,DepthCnt)
        except KeyboardInterrupt:
            print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
            pass
        #print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
    main()  # time python3 ./samdepthplot.py t.tsv.gz 1

【讨论】:

以上是关于如何同时计算大文件中的词频?的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop综合大作业

Hadoop大作业

Hadoop综合大作业

大数据开发之词频统计传参打包成jar包发送到Hadoop运行并创建可执行文件方便运行

Hadoop综合大作业

大数据基础之词频统计Word Count