python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序相关的知识,希望对你有一定的参考价值。
import itertools
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from .base_extractor import BaseExtractor
class ParallelExtractor(BaseExtractor):
POOL_SIZE = mp.cpu_count()
IMAGE_GROUP_SIZE = 512
def transform_frame_images(self, image_seq, **kwargs):
future_seq = self.image_group_future_seq(image_seq, **kwargs)
index_group_seq = self.future_result_seq(future_seq)
for _, group in sorted(index_group_seq):
for image in group:
yield image
def future_result_seq(self, future_seq):
future_list = list(future_seq)
future_seq = as_completed(future_list)
for future in future_seq:
yield future.result()
def image_group_future_seq(self, image_seq, **kwargs):
image_group_seq = self.image_group_seq(image_seq)
with ProcessPoolExecutor(self.POOL_SIZE) as executor:
for index, image_group in enumerate(image_group_seq):
# Serialization for submit to ProcessPoolExecutor.
image_list = list(image_group)
future = executor.submit(
self.local_transform_frame_images,
index,
image_list,
**kwargs
)
yield future
def local_transform_frame_images(self, index, image_list, **kwargs):
# Deserialization.
image_seq = iter(image_list)
image_seq = super(ParallelExtractor, self).transform_frame_images(image_seq, **kwargs)
image_list = list(image_seq)
return index, image_list
def image_group_seq(self, image_seq):
size = self.IMAGE_GROUP_SIZE
it = iter(image_seq)
group = list(itertools.islice(it, size))
while group:
yield group
group = list(itertools.islice(it, size))
# size = random.randint(32, 512)
以上是关于python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序的主要内容,如果未能解决你的问题,请参考以下文章
python concurrent.futures
python的multiprocessing和concurrent.futures有啥区别?
Python:Concurrent.Futures 错误 [TypeError:'NoneType' 对象不可调用]
python并发模块之concurrent.futures
python简单粗暴多线程之concurrent.futures
Python:inotify、concurrent.futures - 如何添加现有文件