执行定期动作[重复]

Posted

技术标签:

【中文标题】执行定期动作[重复]【英文标题】:Executing periodic actions [duplicate] 【发布时间】:2012-01-25 21:00:29 【问题描述】:

我在 Windows 上工作。我想每 10 秒执行一次函数 foo()。 我该怎么做?

【问题讨论】:

【参考方案1】:

这将在每次调用 foo() 之间插入 10 秒的睡眠时间,这大约是您在调用快速完成时所要求的时间。

import time

while True:
    foo()
    time.sleep(10)

在后台线程中调用 foo() 时做其他事情

import time
import sys
import threading

def foo():
    sys.stdout.write('() foo\n'.format(time.ctime()))

def foo_target():
    while True:
        foo()
        time.sleep(10)

t = threading.Thread(target=foo_target)
t.daemon = True
t.start()
print('doing other things...')

【讨论】:

我也想在等待的时候做其他事情。有什么方法可以使用信号吗? 如果你的 foo() 需要未知的时间来完成,你会想要产生一个线程来每 10 秒执行一次 foo(),如果需要,我可以告诉你如何做到这一点。 foo 只是一个快速调用,还是需要几秒钟才能完成? 需要一些时间才能完成【参考方案2】:

如果你打算每 10 秒在 python 脚本中运行 foo(),你可以在这些行上做一些事情。

import time

def foo():
    print "Howdy"

while True:
    foo()
    time.sleep(10)

【讨论】:

【参考方案3】:

foo() 的末尾,创建一个Timer,它会在10 秒后调用foo()。 因为,Timer 新建一个thread 来调用foo()。 您可以在不被阻止的情况下做其他事情。

import time, threading
def foo():
    print(time.ctime())
    threading.Timer(10, foo).start()

foo()

#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011

【讨论】:

这里要注意的一点是开始时间“漂移”。我刚刚进行了一次测试,在大约 33 次迭代中,我的时间漂移​​了 +0.05 秒。我进行了 1 秒的民意调查,这意味着在不到一分钟的时间内漂移了 20%。您可以通过在函数开头而不是结尾调用threading.Timer减少 漂移,但前提是它是函数持续时间导致您的漂移,而不是计时器不可靠性。减少漂移的最佳方法是只在需要的时间内休眠,直到下一个预期的运行时间。我将添加一个示例作为另一个答案。 这也有在每个周期实例化一个新对象(在一个新线程中!)的开销。我找不到这个问题的真正好的解决方案,但我考虑了一下,并很快在下面发布了一个使用生成器的答案 这里的内存使用情况如何?感觉就像一个无限递归调用,是吗? 这个解决方案有点脆弱。由有效负载(在这种情况下为print)引发的任何未捕获的异常(例如IOError)将导致整个调度终止。我更喜欢一种解决方案,它可以更优雅地处理此类事情,并在异常原因(例如,完整的磁盘)修复后恢复到原始行为。 随着定时器线程的数量不断增加,您将如何杀死它们?【参考方案4】:

这是使用 Thread 类的一个很好的实现:http://g-off.net/software/a-python-repeatable-threadingtimer-class

下面的代码更快更脏:

from threading import Timer
from time import sleep

def hello():
    print "hello, world"
    t = Timer(3,hello)
    t.start()

t = Timer(3, hello)
t.start() # after 3 seconds, "hello, world" will be printed

# timer will wake up ever 3 seconds, while we do something else
while True:
    print "do something else"
    sleep(10)

【讨论】:

【参考方案5】:

您可以在不同的线程中执行您的任务。 threading.Timer 会让你在经过一段时间后执行给定的回调一次,例如,如果你想执行你的任务,只要回调返回 True (这实际上是 glib.timeout_add 提供的,但你可能没有安装在 Windows 中)或直到你取消它,你可以使用这个代码:

import logging, threading, functools
import time

logging.basicConfig(level=logging.NOTSET,
                    format='%(threadName)s %(message)s')

class PeriodicTimer(object):
    def __init__(self, interval, callback):
        self.interval = interval

        @functools.wraps(callback)
        def wrapper(*args, **kwargs):
            result = callback(*args, **kwargs)
            if result:
                self.thread = threading.Timer(self.interval,
                                              self.callback)
                self.thread.start()

        self.callback = wrapper

    def start(self):
        self.thread = threading.Timer(self.interval, self.callback)
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def foo():
    logging.info('Doing some work...')
    return True

timer = PeriodicTimer(1, foo)
timer.start()

for i in range(2):
    time.sleep(2)
    logging.info('Doing some other work...')

timer.cancel()

示例输出:

Thread-1 Doing some work...
Thread-2 Doing some work...
MainThread Doing some other work...
Thread-3 Doing some work...
Thread-4 Doing some work...
MainThread Doing some other work...

注意:回调不会在每个间隔执行时执行。间隔是线程在上次回调完成和下一次调用之间等待的时间。

【讨论】:

【参考方案6】:

也许sched module 会满足您的需求。

或者,考虑使用Timer object。

【讨论】:

sched 模块是最灵活的方法。感谢您的链接。【参考方案7】:

仅休眠 10 秒或使用threading.Timer(10,foo) 会导致开始时间漂移。 (您可能不关心这一点,或者根据您的具体情况,它可能是一个重要的问题来源。)这可能有两个原因 - 线程唤醒时间或函数执行时间不准确。

您可以在本文末尾看到一些结果,但首先是一个如何修复它的示例。您需要跟踪下一次调用函数的时间,而不是实际调用函数的时间,并说明差异。

这是一个略有偏差的版本:

import datetime, threading

def foo():
    print datetime.datetime.now()
    threading.Timer(1, foo).start()

foo()

它的输出如下所示:

2013-08-12 13:05:36.483580
2013-08-12 13:05:37.484931
2013-08-12 13:05:38.485505
2013-08-12 13:05:39.486945
2013-08-12 13:05:40.488386
2013-08-12 13:05:41.489819
2013-08-12 13:05:42.491202
2013-08-12 13:05:43.492486
2013-08-12 13:05:44.493865
2013-08-12 13:05:45.494987
2013-08-12 13:05:46.496479
2013-08-12 13:05:47.497824
2013-08-12 13:05:48.499286
2013-08-12 13:05:49.500232

您可以看到亚秒计数在不断增加,因此开始时间是“漂移”的。

这是正确解释漂移的代码:

import datetime, threading, time

next_call = time.time()

def foo():
  global next_call
  print datetime.datetime.now()
  next_call = next_call+1
  threading.Timer( next_call - time.time(), foo ).start()

foo()

它的输出如下所示:

2013-08-12 13:21:45.292565
2013-08-12 13:21:47.293000
2013-08-12 13:21:48.293939
2013-08-12 13:21:49.293327
2013-08-12 13:21:50.293883
2013-08-12 13:21:51.293070
2013-08-12 13:21:52.293393

在这里你可以看到亚秒级的时间不再有任何增加。

如果您的事件非常频繁地发生,您可能希望在单个线程中运行计时器,而不是为每个事件启动一个新线程。在考虑漂移时,这看起来像:

import datetime, threading, time

def foo():
    next_call = time.time()
    while True:
        print datetime.datetime.now()
        next_call = next_call+1;
        time.sleep(next_call - time.time())

timerThread = threading.Thread(target=foo)
timerThread.start()

但是您的应用程序不会正常退出,您需要终止计时器线程。如果你想在你的应用程序完成后正常退出,而不是手动杀死线程,你应该使用

timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()

【讨论】:

为每个调用创建一个线程似乎很浪费。 You could do it in a single thread @J.F.Sebastian 同意,这主要是作为对最高投票答案的扩展而实现的。线程开销通常很小,但是如果您的操作很频繁,您需要做一些不同的事情 - 作为单个线程运行操作是一个微不足道(但通常很重要)的扩展,一些系统还使用专用数据结构,以便许多事件可以安排在一个线程上(这不是那么简单)。 +1,LGTM。如果你想要to stop calling the function before the process exits, you could use threading.Event(). @KiaMorot 在工作需要更长的时间来处理它的情况下,这并不是一个好的解决方案。在这种情况下,使用max(0, next_call - time.time()) 作为睡眠的参数,那么您至少会立即重新启动。 @MichaelAnderson 我们如何将这些等待时间用于其他工作?【参考方案8】:

这是一个简单的基于单线程睡眠的版本,它会漂移,但会在检测到漂移时尝试自动更正。

注意:这只有在满足以下 3 个合理假设时才有效:

    时间周期远大于被执行函数的执行时间 正在执行的函数每次调用所用的时间大致相同 调用之间的偏差量小于一秒

-

from datetime import timedelta
from datetime import datetime

def exec_every_n_seconds(n,f):
    first_called=datetime.now()
    f()
    num_calls=1
    drift=timedelta()
    time_period=timedelta(seconds=n)
    while 1:
        time.sleep(n-drift.microseconds/1000000.0)
        current_time = datetime.now()
        f()
        num_calls += 1
        difference = current_time - first_called
        drift = difference - time_period* num_calls
        print "drift=",drift

【讨论】:

+1 用于补偿漂移的单线程版本。 Here're couple of similar code examples 注意num_calls要初始化为0,而不是1,否则time.sleep会出现异常,因为它的参数可以为负数。【参考方案9】:

很惊讶没有找到使用生成器进行计时的解决方案。我只是为自己的目的设计了这个。

此解决方案:单线程,每个周期没有对象实例化,多次使用生成器,在精确到 time 模块的计时精度上坚如磐石(与我从堆栈交换中尝试过的几种解决方案不同)。

注意:对于 Python 2.x,将下面的 next(g) 替换为 g.next()

import time

def do_every(period,f,*args):
    def g_tick():
        t = time.time()
        while True:
            t += period
            yield max(t - time.time(),0)
    g = g_tick()
    while True:
        time.sleep(next(g))
        f(*args)

def hello(s):
    print('hello  (:.4f)'.format(s,time.time()))
    time.sleep(.3)

do_every(1,hello,'foo')

导致,例如:

hello foo (1421705487.5811)
hello foo (1421705488.5811)
hello foo (1421705489.5809)
hello foo (1421705490.5830)
hello foo (1421705491.5803)
hello foo (1421705492.5808)
hello foo (1421705493.5811)
hello foo (1421705494.5811)
hello foo (1421705495.5810)
hello foo (1421705496.5811)
hello foo (1421705497.5810)
hello foo (1421705498.5810)
hello foo (1421705499.5809)
hello foo (1421705500.5811)
hello foo (1421705501.5811)
hello foo (1421705502.5811)
hello foo (1421705503.5810)

请注意,此示例包括模拟 CPU 在每个周期内执行其他操作 0.3 秒。如果你每次都把它改成随机的,那就没关系了。 yield 行中的最大值用于保护 sleep 免受负数的影响,以防被调用的函数花费的时间超过指定的时间。在这种情况下,它会立即执行,并在下一次执行的时间弥补损失的时间。

【讨论】:

在 Python 3.x 下,time.sleep(g.next()) 不起作用。切换到 time.sleep(next(g)) 就可以了。 我正在尝试在 Pi 上获取真实世界测量值的精确样本。我是信号处理老手。这是正确的解决方案。 这很整洁。与 asyncio 很好地配合执行多项任务。 如此优雅以至于我很惊讶没有提供这种行为的内置插件。感谢分享! 你可以像这样简化代码以消除计数变量def do_every(period,f,*args): def g_tick(): t = time.time() while True: t += period yield max(t - time.time(),0) g = g_tick() while True: time.sleep(next(g)) f(*args)

以上是关于执行定期动作[重复]的主要内容,如果未能解决你的问题,请参考以下文章

watch:定期重复Linux / Unix命令

如何在Vim上重复某些动作?

显示重复的动作出口

如何安排脚本定期运行?

定期删除重复的单元格

android定期重复警报不起作用