在 Python - 3.x 中创建可迭代的自定义对象后,有没有办法防止无限循环?

Posted

技术标签:

【中文标题】在 Python - 3.x 中创建可迭代的自定义对象后,有没有办法防止无限循环?【英文标题】:Is there a way to prevent the infinite loop after creating an iterable custom object in Python - 3.x? 【发布时间】:2019-12-17 10:13:44 【问题描述】:

我正在建立一个库来生成尽可能多的线程,并让它们完成分配给它们的任何任务。当对包含“n”个线程的对象的创建进行单元测试并尝试在对象级别迭代每个线程时,我遇到了一个无限循环。

我修复了竞态条件的酸洗问题,但是现在当尝试输出在 for 循环中创建了多少线程时,在调试时遇到了无限循环。

from __future__ import print_function

try:
    import sys
    from threading import Thread, Lock, current_thread
    from queue import Queue
except ImportError:
    raise ImportError


class Worker(Thread):
    """ Thread executing tasks from a given tasks queue """

    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                # Acquire locking mechanism for threads to prevent race condition
                Lock.acquire()
                func(*args, **kargs)
                # Release locking mechanism
                Lock.release()
            except Exception as e:
                # An exception happened in this thread
                raise e
            finally:
                if self.tasks is None:
                    # Mark process done once there are no more tasks to process
                    self.tasks.task_done()


class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)
        self.num_threads = num_threads
        for _ in range(self.num_threads):
            Worker(self.tasks)

    def __iter__(self):
        return self

    def __next__(self):
        next_value = 0
        while next_value < self.num_threads:
            try:
                next_value += 1
            except Exception:
                raise StopIteration
        return next_value

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue."""
        self.tasks.put((func, args, kargs))

    def task_list(self, func, args_list):
        """Add a list of tasks to the queue."""
        for args in args_list:
            self.add_task(func, args)

    def wait_completion(self):
        """.Wait for completion of all the tasks in the queue."""
        self.tasks.join()

    def get_qsize(self):
        """Return the approximate size of the queue."""
        return self.tasks.qsize()

    def get_current_thread(self):
        return current_thread()

这是评估线程生成对象的创建并迭代和访问每个单独线程的单元测试。

import pytest
import unittest
import SpawnThreads


@unittest
class TestThreadFactory(unittest.TestCase):

    def test_spawn_threads(self):
        workforce = SpawnThreads(5)

        self.assertIsNotNone(workforce)

        print(workforce)

        for w in workforce:
            print(w)

预期的输出应该是地址空间/对象,即每个线程(共 5 个)。

更具体地说,我想在控制台中看到这个结果 5 次:

<ThreadFactory.thread_factory.SpawnThreads object at 0x0000024E851F5208>

我得到的是无限返回的整数 5,而不是线程的 5 个地址。

【问题讨论】:

Build a Basic Python Iterator的可能重复 @krisz 我看到了,但我想打印我实际创建的对象数量,即使使用这种方法,我也会收到一个无限循环的 w 显示 int '5',但连续做同样的事情。 你不是说&lt;Worker(Thread-1, started daemon 140595349669632)&gt; 5 次吗?只有一个 SpawnThreads 对象。 @krisz 是的,这正是我想要的 【参考方案1】:

您的__next__ 定义上总是会返回5 并且永远不会结束。 __next__ 不是生成器函数,除了 self 上的内容之外,没有任何进入状态。所以你总是循环直到next_value(一个无状态的局部变量)等于self.num_threads(一个永远不会改变的值)并返回它;你的__next__ 可以简化为return self.num_threads,而StopIteration 永远不会被提升(因此是无限循环)。

如果你希望它返回不同的值(特别是你的每个工人),你需要状态:

class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)
        self.next_value = 0               # Initial next_value in instance state
        self.num_threads = num_threads
        # Store list of workers
        self.workers = [Worker(self.tasks) for _ in range(self.num_threads)]

    def __iter__(self):
        return self

    def __next__(self):
        # Check if iteration has finished
        if self.next_value >= self.num_threads:
            raise StopIteration
        retval = self.workers[self.next_value]  # Save value to return to local
        self.next_value += 1                    # Increment state for next use
        return retval                           # Return value

如果您真的很在意,可以用另一种巧妙的方法替换最后三行以避免局部变量:

        try:
            return self.workers[self.next_value]
        finally:
            self.next_value += 1

或者更好的是,您可以使用 Python 内置函数为您完成工作:

class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)

        self.num_threads = num_threads
        self.workers = [Worker(self.tasks) for _ in range(self.num_threads)]
        self.next_worker_iter = iter(self.workers) # Iterates workers

    def __iter__(self):
        return self

    def __next__(self):
        # Let the list iterator do the work of maintaining state,
        # raising StopIteration, etc.
        return next(self.next_worker_iter)

这种方法更简单、更快,并且作为奖励,线程安全,至少在 CPython 上是这样(如果两个线程迭代相同的 SpawnThreads 实例,每个工作线程将只生成一次,而不是可能生成的值跳过或重复)。

如果目标是制作一个可迭代的(可以迭代多次),而不是一个迭代器(可以从头到尾迭代一次,并且不再重复),最简单的解决方案是让__iter__本身返回一个迭代器,完全不需要__next__

class SpawnThreads:
    """Pool of threads consuming tasks from a queue."""

    def __init__(self, num_threads: int):
        self.tasks = Queue(num_threads)

        self.num_threads = num_threads
        self.workers = [Worker(self.tasks) for _ in range(self.num_threads)]

    def __iter__(self):
        # Makes this a generator function that produces each Worker once
        yield from self.workers
        # Alternatively:
        return iter(self.workers)
        # though that exposes more implementation details than anonymous generators

【讨论】:

【参考方案2】:

问题出在这个方法上:

def __next__(self):
    next_value = 0
    while next_value < self.num_threads:
        try:
            next_value += 1
        except Exception:
            raise StopIteration
    return next_value

基本一样

def __next__(self):
    return self.num_threads

正如您所见,没有任何迭代器状态,它将永远返回相同的数字。 next_value += 1 永远不会抛出异常,next_value 只是一个整数。

要实现您想要的,只需将线程存储在容器中并将迭代器返回到该容器即可。修改SpawnThreads

def __init__(self, num_threads: int):
    self.tasks = Queue(num_threads)
    self.num_threads = num_threads
    self.threads = []
    for _ in range(self.num_threads):
        self.threads.append(Worker(self.tasks));

def __iter__(self):
    return iter(self.threads)

# remove the __next__() method

【讨论】:

这对看似预期的行为进行了两项重大更改:1) 它返回 Worker 对象,而不是整数值,以及 2) 它使 SpawnThreads 成为 iterable,不是迭代器;如果您对实例进行两次循环,您将生成两次值,其中迭代器被迭代一次且仅一次。 1. OP希望返回对象(虽然不是 Thread 对象,但自从他修改问题以来,相同的 SpawnThreads 对象 5 次(这没有任何意义)) 2. SpawnThreads 从来都不是迭代器,因为迭代器几乎不会产生线程。

以上是关于在 Python - 3.x 中创建可迭代的自定义对象后,有没有办法防止无限循环?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 python kivy 中创建可滚动的 FloatLayout

在Python中创建可执行的符号链接?

在自定义视图单元中创建可绑定事件处理程序

如何在 uiimagepickercontroller 中创建可裁剪的横幅图像?

在Vue中创建可重用的 Transition

如何在 WPF 中创建可折叠面板