如何在 fasta 文件中并行化计算,其中每个处理器采用一个序列

Posted

技术标签:

【中文标题】如何在 fasta 文件中并行化计算,其中每个处理器采用一个序列【英文标题】:How I can parallelize computations in a fasta file, where each processor takes one sequence 【发布时间】:2019-06-06 15:43:59 【问题描述】:

我不知道如何并行化 Python 中的代码,该代码获取 FASTA 文件的每一行并对其进行一些统计,例如计算 GC 内容。您是否有一些技巧或库可以帮助我减少执行时间?

我尝试使用 os.fork(),但它比顺序代码给了我更多的执行时间。可能是因为我不太清楚如何给每个孩子一个不同的顺序。

#Computing GC Content
from Bio import SeqIO                  
with open('chr1.fa', 'r') as f:
    records = list (SeqIO.parse(f,'fasta'))
    GC_for_sequence=[]
    for i in records:
        GC=0
        for j in i:
            if j in "GC":
                GC+=1
        GC_for_sequence.append(GC/len(i))
    print(GC_for_sequence)

预期的执行是:每个进程采用一个序列,它们并行进行统计。

【问题讨论】:

【参考方案1】:

这是标准multiprocessing 模块的一个想法:

from multiprocessing import Pool
import numpy as np

no_cores_to_use = 4

GC_for_sequence = [np.random.rand(100) for x in range(10)]

with Pool(no_cores_to_use) as pool:
    result = pool.map(np.average, GC_for_sequence)

print(result)

在代码中,我使用numpy 模块来模拟包含一些内容的列表。 pool.map 将要在数据上使用的函数作为第一个参数,将数据列表作为第二个参数。您可以轻松定义自己的功能。默认情况下,它应该采用单个参数。如果您想通过更多,请使用functools.partial

[编辑] 这是一个更接近您的问题的示例:

from multiprocessing import Pool
import numpy as np

records = ['ACTGTCGCAGC' for x in range(10)]
no_cores_to_use = 4

def count(sequence):
    count = sequence.count('GC')
    return count

with Pool(no_cores_to_use) as pool:
    result = pool.map(count, records)

print(sum(result))

【讨论】:

好的!但是如果我想放置大量不同的序列,我是否需要将它们放在一个列表中,就像您在记录中所做的那样? 从您的代码看来,您从一个列表 (list (SeqIO.parse(f,'fasta'))) 开始,因此无需做任何额外的事情。 如果我将数据添加为标准输入并制作文件的“猫”(在外壳中),该模块将起作用吗? 你可以让它工作,但它会非常低效。检查这个答案:***.com/questions/7654971/… 简而言之,要么使用那里给出的显式生成器,要么使用迭代器 SeqIO.parse("filename", "fasta") 在你的代码中,你将生成器变成列表。别。只需将迭代器传递给map【参考方案2】:

关于您现有代码的几点说明:

    我建议不要这样做:list (SeqIO.parse(…)),因为这将暂停执行,直到所有序列都加载到内存中,你最好(内存和总执行时间)将其保留为迭代器和根据需要将元素消耗给工人

    循环遍历每个字符非常慢,使用 str.count快得多

把这些放在一起,你可以做到:

from Bio import SeqIO

with open('chr1.fa') as fd:
    gc_for_sequence=[]
    for seq in SeqIO.parse(fd, 'fasta'):
        gc = sum(seq.seq.count(base) for base in "GC")
        gc_for_sequence.append(gc / len(seq))

如果这还不够快,那么您可以使用 multiprocessing 模块,例如:

from Bio import SeqIO
from multiprocessing import Pool

def sequence_gc_prop(seq):
    return sum(seq.count(base) for base in "GC") / len(seq)

with open('chr1.fa') as fd, Pool() as pool:
    gc_for_sequence = pool.map(
        sequence_gc_prop,
        (seq.seq for seq in SeqIO.parse(fd, 'fasta')),
        chunksize=1000,
    )

来自 Lukasz 的 cmets 主要适用。其他不明显的东西:

奇怪的seq.seq for seq in… 是为了确保我们不会腌制不必要的数据 我将chunksize 设置为相当大的值,因为该函数应该很快,因此我们希望给子进程合理的工作量,这样父进程就不会花费所有时间来编排事务李>

【讨论】:

谢谢!那么,在 pool.map 函数(seq.seq for seq in SeqIO.parse(fd, 'fasta'))中的输入数据中,模块获取我文件的每一行并以并行方式计算 GC 内容?跨度> 应该做!您可以同时运行top 以确保它实际上在多个处理器上运行。猜测一下:这种任务不太适合并行运行,可以分配给每个处理器的有用工作量是有限的。这意味着主进程将花费大部分时间读取数据和协调。重新定义问题会有所帮助,例如一次处理多个文件

以上是关于如何在 fasta 文件中并行化计算,其中每个处理器采用一个序列的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何在 HDFS 中并行化多 gz 文件处理

python 在批量线程中运行可并行化的函数,等待每个批处理完成。这允许计算机的全部资源b

新手求助:用perl处理fasta文件

python文本处理---计算fasta文件中不同氨基酸的数目

如何在 fasta 文件的每个标题中附加文件名? [关闭]

perl处理fasta文件