制作计时器:threading.Event.wait 的超时不准确 - Python 3.6

Posted

技术标签:

【中文标题】制作计时器:threading.Event.wait 的超时不准确 - Python 3.6【英文标题】:Making a timer: timeout inaccuracy of threading.Event.wait - Python 3.6 【发布时间】:2018-08-05 16:10:19 【问题描述】:

首先,我是 Python 新手,不熟悉它的功能。我一直主要使用MATLAB。

PC 简要规格:Windows 10、Intel i7

我正在尝试制作一个计时器类,用于定期执行诸如 MATLAB 具有的函数,这显然是从 Java 计时器借来的。 MATLAB 计时器的分辨率约为 1 毫秒,我从未见过它在任何情况下都超过 2 毫秒。其实对于我的项目来说已经足够准确了。

最近,由于 MATLAB 的并行计算和 Web 访问功能较差,我计划转向 Python。然而,不幸的是,与我必须制作自己的定时器类的 MATLAB 相比,Python 的标准包提供了一些低级别的定时器(threading.Timer)。首先,我提到了 QnA Executing periodic actions in Python [duplicate]。 Michael Anderson 建议的解决方案给出了漂移校正的简单概念。他使用 time.sleep() 来保持期间。该方法非常准确,有时比 MATLAB 计时器显示出更好的准确性。大约0.5 毫秒分辨率。但是,定时器在 time.sleep() 中被捕获期间不能被中断(暂停或恢复)。但我有时不得不立即停止,不管它是否处于 sleep() 中。

我发现的问题的解决方案是利用线程包中的 Event 类。请参阅Python threading.timer - repeat function every 'n' seconds 。使用 Event.wait() 的超时功能,我可以在执行之间设置一个时间间隔,它用于保持时间段。也就是说,事件通常会被清除,因此 wait(timeout) 可以像 time.sleep(interval) 一样起作用,并且我可以在需要时通过设置事件立即退出 wait()。

当时一切似乎都很好,但 Event.wait() 中存在一个严重问题。时间延迟在 1 ~ 15 ms 之间变化太大。我认为它来自 Event.wait() 的开销。

我做了一个示例代码,显示 time.sleep() 和 Event.wait() 之间的准确性比较。这总计 1 毫秒 sleep() 和 wait() 的 1000 次迭代,以查看累积的时间误差。预期结果约为 1.000。

import time
from threading import Event

time.sleep(3)  # to relax

# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
    time.sleep(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

time.sleep(3)  # to relax

# Event.wait()    
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
    event.wait(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

结果:

1.1379848184879964
15.614547161211096

结果表明 time.sleep() 的准确性要好得多。但我不能像前面提到的那样纯粹依赖 time.sleep()。

总之,

time.sleep():准确但不可中断 threading.Event.wait():不准确但可中断

我目前正在考虑妥协:就像在示例中一样,创建一个微小的 time.sleep() 循环(间隔为 0.5 毫秒)并使用 if 语句和 break 退出循环需要的时候。据我所知,该方法用于Python 2.x Python time.sleep() vs event.wait()。

这是一个冗长的介绍,但我的问题可以总结如下。

    我能否通过外部信号或事件强制线程进程从 time.sleep() 中断? (这似乎是最有效的。???)

    使 Event.wait() 更准确或减少开销时间。

    除了 sleep() 和 Event.wait() 方法之外,还有没有更好的方法来提高计时精度。

非常感谢。

【问题讨论】:

【参考方案1】:

我遇到了与Event.wait() 相同的时间问题。我想出的解决方案是创建一个模仿threading.Event 的类。在内部,它使用time.sleep() 循环和繁忙循环的组合来大大提高精度。睡眠循环在单独的线程中运行,因此主线程中的阻塞 wait() 调用仍然可以立即中断。当set() 方法被调用时,睡眠线程应该在不久之后终止。另外,为了尽量减少 CPU 使用率,我确保繁忙循环的运行时间永远不会超过 3 毫秒。

这是我的自定义 Event 类以及最后的计时演示(演示中打印的执行时间将以纳秒为单位):

import time
import _thread
import datetime


class Event:
    __slots__ = (
        "_flag", "_lock", "_nl",
        "_pc", "_waiters"
    )

    _lock_type = _thread.LockType
    _timedelta = datetime.timedelta
    _perf_counter = time.perf_counter
    _new_lock = _thread.allocate_lock

    class _switch:
        __slots__ = ("_on",)

        def __call__(self, on: bool = None):
            if on is None:
                return self._on

            self._on = on

        def __bool__(self):
            return self._on

        def __init__(self):
            self._on = False

    def clear(self):
        with self._lock:
            self._flag(False)

    def is_set(self) -> bool:
        return self._flag()

    def set(self):
        with self._lock:
            self._flag(True)
            waiters = self._waiters

            for waiter in waiters:
                waiter.release()

            waiters.clear()

    def wait(
        self,
        timeout: float = None
    ) -> bool:
        with self._lock:
            return self._wait(self._pc(), timeout)

    def _new_waiter(self) -> _lock_type:
        waiter = self._nl()
        waiter.acquire()
        self._waiters.append(waiter)
        return waiter

    def _wait(
        self,
        start: float,
        timeout: float,
        td=_timedelta,
        pc=_perf_counter,
        end: _timedelta = None,
        waiter: _lock_type = None,
        new_thread=_thread.start_new_thread,
        thread_delay=_timedelta(milliseconds=3)
    ) -> bool:
        flag = self._flag

        if flag:
            return True
        elif timeout is None:
            waiter = self._new_waiter()
        elif timeout <= 0:
            return False
        else:
            delay = td(seconds=timeout)
            end = td(seconds=start) + delay

            if delay > thread_delay:
                mark = end - thread_delay
                waiter = self._new_waiter()
                new_thread(
                    self._wait_thread,
                    (flag, mark, waiter)
                )

        lock = self._lock
        lock.release()

        try:
            if waiter:
                waiter.acquire()

            if end:
                while (
                    not flag and
                    td(seconds=pc()) < end
                ):
                    pass

        finally:
            lock.acquire()

            if waiter and not flag:
                self._waiters.remove(waiter)

        return flag()

    @staticmethod
    def _wait_thread(
        flag: _switch,
        mark: _timedelta,
        waiter: _lock_type,
        td=_timedelta,
        pc=_perf_counter,
        sleep=time.sleep
    ):
        while not flag and td(seconds=pc()) < mark:
            sleep(0.001)

        if waiter.locked():
            waiter.release()

    def __new__(cls):
        _new_lock = cls._new_lock
        _self = object.__new__(cls)
        _self._waiters = []
        _self._nl = _new_lock
        _self._lock = _new_lock()
        _self._flag = cls._switch()
        _self._pc = cls._perf_counter
        return _self


if __name__ == "__main__":
    def test_wait_time():
        wait_time = datetime.timedelta(microseconds=1)
        wait_time = wait_time.total_seconds()

        def test(
            event=Event(),
            delay=wait_time,
            pc=time.perf_counter
        ):
            pc1 = pc()
            event.wait(delay)
            pc2 = pc()
            pc1, pc2 = [
                int(nbr * 1000000000)
                for nbr in (pc1, pc2)
            ]
            return pc2 - pc1

        lst = [
            f"i.\t\ttest()"
            for i in range(1, 11)
        ]
        print("\n".join(lst))

    test_wait_time()
    del test_wait_time

【讨论】:

【参考方案2】:

Chris D 的自定义 Event 类效果非常好!出于实际目的,我将它包含在一个可安装的包中(https://github.com/ovinc/oclock,使用pip install oclock 安装),其中还包括其他计时工具。从 oclock 的 1.3.0 版起,可以使用 Chris D 的回答中讨论的自定义 Event 类,例如

from oclock import Event
event = Event()
event.wait(1)

使用Event 类的常用set()clear()is_set()wait() 方法。

计时精度比threading.Event 好得多,尤其是在Windows 中。例如,在具有 1000 个重复循环的 Windows 机器上,threading.Event 循环持续时间的标准偏差为 7 毫秒,oclock.Event 循环持续时间小于 0.01 毫秒。克里斯 D 的道具!

注意oclock 包在 GPLv3 许可下与 *** 的 CC BY-SA 4.0 兼容。

【讨论】:

【参考方案3】:

感谢您提供此主题和所有答案。我也遇到了一些时间不准确的问题(Windows 10 + Python 3.9 + Threading)。

解决方案是使用oclock 包,并通过wres 包(临时)更改Windows 系统计时器的分辨率。此软件包使用未记录的 Windows API 函数 NtSetTimerResolution(警告:分辨率已在系统范围内更改)。

只应用oclock包并不能解决问题。

应用这两个 python 包后,下面的代码可以正确且准确地安排定期事件。如果终止,则恢复原始计时器分辨率。

import threading
import datetime
import time
import oclock
import wres

class Job(threading.Thread):
    def __init__(self, interval, *args, **kwargs):
        threading.Thread.__init__(self)
        # use oclock.Event() instead of threading.Event()
        self.stopped = oclock.Event()
        self.interval = interval.total_seconds()
        self.args = args
        self.kwargs = kwargs

    def stop(self):
        self.stopped.set()
        self.join()

    def run(self):
        prevTime = time.time()
        while not self.stopped.wait(self.interval):
            now = time.time()
            print(now - prevTime)
            prevTime = now

# Set system timer resolution to 1 ms
# Automatically restore previous resolution when exit with statement
with wres.set_resolution(10000):
    # Create thread with periodic task called every 10 ms
    job = Job(interval=datetime.timedelta(seconds=0.010))
    job.start()

    try:
        while True:
            time.sleep(1)
    # Hit Ctrl+C to terminate main loop and spawned thread
    except KeyboardInterrupt:
        job.stop()

【讨论】:

以上是关于制作计时器:threading.Event.wait 的超时不准确 - Python 3.6的主要内容,如果未能解决你的问题,请参考以下文章

Ipython Widgets(如何制作计时器)

flash倒计时器制作

SwiftUI - 如何制作启动/停止计时器

使用按钮和计时器制作报价滚筒

如何在python中制作计时器而不冻结整个代码

如何使用 Swing 制作方法,使用计时器睡眠