如何在 fasta 文件中并行化计算,其中每个处理器采用一个序列
Posted
技术标签:
【中文标题】如何在 fasta 文件中并行化计算,其中每个处理器采用一个序列【英文标题】:How I can parallelize computations in a fasta file, where each processor takes one sequence 【发布时间】:2019-06-06 15:43:59 【问题描述】:我不知道如何并行化 Python 中的代码,该代码获取 FASTA 文件的每一行并对其进行一些统计,例如计算 GC 内容。您是否有一些技巧或库可以帮助我减少执行时间?
我尝试使用 os.fork(),但它比顺序代码给了我更多的执行时间。可能是因为我不太清楚如何给每个孩子一个不同的顺序。
#Computing GC Content
from Bio import SeqIO
with open('chr1.fa', 'r') as f:
records = list (SeqIO.parse(f,'fasta'))
GC_for_sequence=[]
for i in records:
GC=0
for j in i:
if j in "GC":
GC+=1
GC_for_sequence.append(GC/len(i))
print(GC_for_sequence)
预期的执行是:每个进程采用一个序列,它们并行进行统计。
【问题讨论】:
【参考方案1】:这是标准multiprocessing 模块的一个想法:
from multiprocessing import Pool
import numpy as np
no_cores_to_use = 4
GC_for_sequence = [np.random.rand(100) for x in range(10)]
with Pool(no_cores_to_use) as pool:
result = pool.map(np.average, GC_for_sequence)
print(result)
在代码中,我使用numpy
模块来模拟包含一些内容的列表。 pool.map
将要在数据上使用的函数作为第一个参数,将数据列表作为第二个参数。您可以轻松定义自己的功能。默认情况下,它应该采用单个参数。如果您想通过更多,请使用functools.partial
。
[编辑] 这是一个更接近您的问题的示例:
from multiprocessing import Pool
import numpy as np
records = ['ACTGTCGCAGC' for x in range(10)]
no_cores_to_use = 4
def count(sequence):
count = sequence.count('GC')
return count
with Pool(no_cores_to_use) as pool:
result = pool.map(count, records)
print(sum(result))
【讨论】:
好的!但是如果我想放置大量不同的序列,我是否需要将它们放在一个列表中,就像您在记录中所做的那样? 从您的代码看来,您从一个列表 (list (SeqIO.parse(f,'fasta'))
) 开始,因此无需做任何额外的事情。
如果我将数据添加为标准输入并制作文件的“猫”(在外壳中),该模块将起作用吗?
你可以让它工作,但它会非常低效。检查这个答案:***.com/questions/7654971/… 简而言之,要么使用那里给出的显式生成器,要么使用迭代器 SeqIO.parse("filename", "fasta")
在你的代码中,你将生成器变成列表。别。只需将迭代器传递给map
。【参考方案2】:
关于您现有代码的几点说明:
我建议不要这样做:list (SeqIO.parse(…))
,因为这将暂停执行,直到所有序列都加载到内存中,你最好(内存和总执行时间)将其保留为迭代器和根据需要将元素消耗给工人
循环遍历每个字符非常慢,使用 str.count
会快得多
把这些放在一起,你可以做到:
from Bio import SeqIO
with open('chr1.fa') as fd:
gc_for_sequence=[]
for seq in SeqIO.parse(fd, 'fasta'):
gc = sum(seq.seq.count(base) for base in "GC")
gc_for_sequence.append(gc / len(seq))
如果这还不够快,那么您可以使用 multiprocessing
模块,例如:
from Bio import SeqIO
from multiprocessing import Pool
def sequence_gc_prop(seq):
return sum(seq.count(base) for base in "GC") / len(seq)
with open('chr1.fa') as fd, Pool() as pool:
gc_for_sequence = pool.map(
sequence_gc_prop,
(seq.seq for seq in SeqIO.parse(fd, 'fasta')),
chunksize=1000,
)
来自 Lukasz 的 cmets 主要适用。其他不明显的东西:
奇怪的seq.seq for seq in…
是为了确保我们不会腌制不必要的数据
我将chunksize
设置为相当大的值,因为该函数应该很快,因此我们希望给子进程合理的工作量,这样父进程就不会花费所有时间来编排事务李>
【讨论】:
谢谢!那么,在 pool.map 函数(seq.seq for seq in SeqIO.parse(fd, 'fasta'))中的输入数据中,模块获取我文件的每一行并以并行方式计算 GC 内容?跨度> 应该做!您可以同时运行top
以确保它实际上在多个处理器上运行。猜测一下:这种任务不太适合并行运行,可以分配给每个处理器的有用工作量是有限的。这意味着主进程将花费大部分时间读取数据和协调。重新定义问题会有所帮助,例如一次处理多个文件以上是关于如何在 fasta 文件中并行化计算,其中每个处理器采用一个序列的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:如何在 HDFS 中并行化多 gz 文件处理
python 在批量线程中运行可并行化的函数,等待每个批处理完成。这允许计算机的全部资源b