制作计时器:threading.Event.wait 的超时不准确 - Python 3.6
Posted
技术标签:
【中文标题】制作计时器:threading.Event.wait 的超时不准确 - Python 3.6【英文标题】:Making a timer: timeout inaccuracy of threading.Event.wait - Python 3.6 【发布时间】:2018-08-05 16:10:19 【问题描述】:首先,我是 Python 新手,不熟悉它的功能。我一直主要使用MATLAB。
PC 简要规格:Windows 10、Intel i7
我正在尝试制作一个计时器类,用于定期执行诸如 MATLAB 具有的函数,这显然是从 Java 计时器借来的。 MATLAB 计时器的分辨率约为 1 毫秒,我从未见过它在任何情况下都超过 2 毫秒。其实对于我的项目来说已经足够准确了。
最近,由于 MATLAB 的并行计算和 Web 访问功能较差,我计划转向 Python。然而,不幸的是,与我必须制作自己的定时器类的 MATLAB 相比,Python 的标准包提供了一些低级别的定时器(threading.Timer)。首先,我提到了 QnA Executing periodic actions in Python [duplicate]。 Michael Anderson 建议的解决方案给出了漂移校正的简单概念。他使用 time.sleep() 来保持期间。该方法非常准确,有时比 MATLAB 计时器显示出更好的准确性。大约0.5 毫秒分辨率。但是,定时器在 time.sleep() 中被捕获期间不能被中断(暂停或恢复)。但我有时不得不立即停止,不管它是否处于 sleep() 中。
我发现的问题的解决方案是利用线程包中的 Event 类。请参阅Python threading.timer - repeat function every 'n' seconds 。使用 Event.wait() 的超时功能,我可以在执行之间设置一个时间间隔,它用于保持时间段。也就是说,事件通常会被清除,因此 wait(timeout) 可以像 time.sleep(interval) 一样起作用,并且我可以在需要时通过设置事件立即退出 wait()。
当时一切似乎都很好,但 Event.wait() 中存在一个严重问题。时间延迟在 1 ~ 15 ms 之间变化太大。我认为它来自 Event.wait() 的开销。
我做了一个示例代码,显示 time.sleep() 和 Event.wait() 之间的准确性比较。这总计 1 毫秒 sleep() 和 wait() 的 1000 次迭代,以查看累积的时间误差。预期结果约为 1.000。
import time
from threading import Event
time.sleep(3) # to relax
# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
time.sleep(tspan/N)
t2 = time.perf_counter()
print(t2-t1)
time.sleep(3) # to relax
# Event.wait()
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
event.wait(tspan/N)
t2 = time.perf_counter()
print(t2-t1)
结果:
1.1379848184879964
15.614547161211096
结果表明 time.sleep() 的准确性要好得多。但我不能像前面提到的那样纯粹依赖 time.sleep()。
总之,
time.sleep():准确但不可中断 threading.Event.wait():不准确但可中断我目前正在考虑妥协:就像在示例中一样,创建一个微小的 time.sleep() 循环(间隔为 0.5 毫秒)并使用 if 语句和 break 退出循环需要的时候。据我所知,该方法用于Python 2.x Python time.sleep() vs event.wait()。
这是一个冗长的介绍,但我的问题可以总结如下。
我能否通过外部信号或事件强制线程进程从 time.sleep() 中断? (这似乎是最有效的。???)
使 Event.wait() 更准确或减少开销时间。
除了 sleep() 和 Event.wait() 方法之外,还有没有更好的方法来提高计时精度。
非常感谢。
【问题讨论】:
【参考方案1】:我遇到了与Event.wait()
相同的时间问题。我想出的解决方案是创建一个模仿threading.Event
的类。在内部,它使用time.sleep()
循环和繁忙循环的组合来大大提高精度。睡眠循环在单独的线程中运行,因此主线程中的阻塞 wait()
调用仍然可以立即中断。当set()
方法被调用时,睡眠线程应该在不久之后终止。另外,为了尽量减少 CPU 使用率,我确保繁忙循环的运行时间永远不会超过 3 毫秒。
这是我的自定义 Event
类以及最后的计时演示(演示中打印的执行时间将以纳秒为单位):
import time
import _thread
import datetime
class Event:
__slots__ = (
"_flag", "_lock", "_nl",
"_pc", "_waiters"
)
_lock_type = _thread.LockType
_timedelta = datetime.timedelta
_perf_counter = time.perf_counter
_new_lock = _thread.allocate_lock
class _switch:
__slots__ = ("_on",)
def __call__(self, on: bool = None):
if on is None:
return self._on
self._on = on
def __bool__(self):
return self._on
def __init__(self):
self._on = False
def clear(self):
with self._lock:
self._flag(False)
def is_set(self) -> bool:
return self._flag()
def set(self):
with self._lock:
self._flag(True)
waiters = self._waiters
for waiter in waiters:
waiter.release()
waiters.clear()
def wait(
self,
timeout: float = None
) -> bool:
with self._lock:
return self._wait(self._pc(), timeout)
def _new_waiter(self) -> _lock_type:
waiter = self._nl()
waiter.acquire()
self._waiters.append(waiter)
return waiter
def _wait(
self,
start: float,
timeout: float,
td=_timedelta,
pc=_perf_counter,
end: _timedelta = None,
waiter: _lock_type = None,
new_thread=_thread.start_new_thread,
thread_delay=_timedelta(milliseconds=3)
) -> bool:
flag = self._flag
if flag:
return True
elif timeout is None:
waiter = self._new_waiter()
elif timeout <= 0:
return False
else:
delay = td(seconds=timeout)
end = td(seconds=start) + delay
if delay > thread_delay:
mark = end - thread_delay
waiter = self._new_waiter()
new_thread(
self._wait_thread,
(flag, mark, waiter)
)
lock = self._lock
lock.release()
try:
if waiter:
waiter.acquire()
if end:
while (
not flag and
td(seconds=pc()) < end
):
pass
finally:
lock.acquire()
if waiter and not flag:
self._waiters.remove(waiter)
return flag()
@staticmethod
def _wait_thread(
flag: _switch,
mark: _timedelta,
waiter: _lock_type,
td=_timedelta,
pc=_perf_counter,
sleep=time.sleep
):
while not flag and td(seconds=pc()) < mark:
sleep(0.001)
if waiter.locked():
waiter.release()
def __new__(cls):
_new_lock = cls._new_lock
_self = object.__new__(cls)
_self._waiters = []
_self._nl = _new_lock
_self._lock = _new_lock()
_self._flag = cls._switch()
_self._pc = cls._perf_counter
return _self
if __name__ == "__main__":
def test_wait_time():
wait_time = datetime.timedelta(microseconds=1)
wait_time = wait_time.total_seconds()
def test(
event=Event(),
delay=wait_time,
pc=time.perf_counter
):
pc1 = pc()
event.wait(delay)
pc2 = pc()
pc1, pc2 = [
int(nbr * 1000000000)
for nbr in (pc1, pc2)
]
return pc2 - pc1
lst = [
f"i.\t\ttest()"
for i in range(1, 11)
]
print("\n".join(lst))
test_wait_time()
del test_wait_time
【讨论】:
【参考方案2】:Chris D 的自定义 Event 类效果非常好!出于实际目的,我将它包含在一个可安装的包中(https://github.com/ovinc/oclock,使用pip install oclock
安装),其中还包括其他计时工具。从 oclock
的 1.3.0 版起,可以使用 Chris D 的回答中讨论的自定义 Event
类,例如
from oclock import Event
event = Event()
event.wait(1)
使用Event
类的常用set()
、clear()
、is_set()
、wait()
方法。
计时精度比threading.Event
好得多,尤其是在Windows 中。例如,在具有 1000 个重复循环的 Windows 机器上,threading.Event
循环持续时间的标准偏差为 7 毫秒,oclock.Event
循环持续时间小于 0.01 毫秒。克里斯 D 的道具!
注意:oclock
包在 GPLv3 许可下与 *** 的 CC BY-SA 4.0 兼容。
【讨论】:
【参考方案3】:感谢您提供此主题和所有答案。我也遇到了一些时间不准确的问题(Windows 10 + Python 3.9 + Threading)。
解决方案是使用oclock 包,并通过wres 包(临时)更改Windows 系统计时器的分辨率。此软件包使用未记录的 Windows API 函数 NtSetTimerResolution(警告:分辨率已在系统范围内更改)。
只应用oclock包并不能解决问题。
应用这两个 python 包后,下面的代码可以正确且准确地安排定期事件。如果终止,则恢复原始计时器分辨率。
import threading
import datetime
import time
import oclock
import wres
class Job(threading.Thread):
def __init__(self, interval, *args, **kwargs):
threading.Thread.__init__(self)
# use oclock.Event() instead of threading.Event()
self.stopped = oclock.Event()
self.interval = interval.total_seconds()
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
prevTime = time.time()
while not self.stopped.wait(self.interval):
now = time.time()
print(now - prevTime)
prevTime = now
# Set system timer resolution to 1 ms
# Automatically restore previous resolution when exit with statement
with wres.set_resolution(10000):
# Create thread with periodic task called every 10 ms
job = Job(interval=datetime.timedelta(seconds=0.010))
job.start()
try:
while True:
time.sleep(1)
# Hit Ctrl+C to terminate main loop and spawned thread
except KeyboardInterrupt:
job.stop()
【讨论】:
以上是关于制作计时器:threading.Event.wait 的超时不准确 - Python 3.6的主要内容,如果未能解决你的问题,请参考以下文章