python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序相关的知识,希望对你有一定的参考价值。

import itertools
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed

from .base_extractor import BaseExtractor

class ParallelExtractor(BaseExtractor):
  
    POOL_SIZE = mp.cpu_count()
    IMAGE_GROUP_SIZE = 512

    def transform_frame_images(self, image_seq, **kwargs):
        future_seq = self.image_group_future_seq(image_seq, **kwargs)
        index_group_seq = self.future_result_seq(future_seq)
        for _, group in sorted(index_group_seq):
            for image in group:
                yield image

    def future_result_seq(self, future_seq):
        future_list = list(future_seq)
        future_seq = as_completed(future_list)
        for future in future_seq:
            yield future.result()

    def image_group_future_seq(self, image_seq, **kwargs):
        image_group_seq = self.image_group_seq(image_seq)
        with ProcessPoolExecutor(self.POOL_SIZE) as executor:
            for index, image_group in enumerate(image_group_seq):
                # Serialization for submit to ProcessPoolExecutor.
                image_list = list(image_group)
                future = executor.submit(
                    self.local_transform_frame_images,
                    index,
                    image_list,
                    **kwargs
                )
                yield future

    def local_transform_frame_images(self, index, image_list, **kwargs):
        # Deserialization.
        image_seq = iter(image_list)
        image_seq = super(ParallelExtractor, self).transform_frame_images(image_seq, **kwargs)
        image_list = list(image_seq)
        return index, image_list

    def image_group_seq(self, image_seq):
        size = self.IMAGE_GROUP_SIZE
        it = iter(image_seq)
        group = list(itertools.islice(it, size))
        while group:
            yield group
            group = list(itertools.islice(it, size))
            # size = random.randint(32, 512)

以上是关于python concurrent.futures.ProcessPoolExecutor的示例。使用“as_completed”获得无序结果的关键思路之一。如果是序列顺序的主要内容,如果未能解决你的问题,请参考以下文章

python concurrent.futures

python的multiprocessing和concurrent.futures有啥区别?

Python:Concurrent.Futures 错误 [TypeError:'NoneType' 对象不可调用]

python并发模块之concurrent.futures

python简单粗暴多线程之concurrent.futures

Python:inotify、concurrent.futures - 如何添加现有文件