如何在 Python 中每 60 秒异步执行一次函数?
Posted
技术标签:
【中文标题】如何在 Python 中每 60 秒异步执行一次函数?【英文标题】:How to execute a function asynchronously every 60 seconds in Python? 【发布时间】:2011-01-14 10:55:09 【问题描述】:我想在 Python 上每 60 秒执行一次函数,但我不想同时被阻塞。
如何异步执行?
import threading
import time
def f():
print("hello world")
threading.Timer(3, f).start()
if __name__ == '__main__':
f()
time.sleep(20)
使用此代码,函数 f 在 20 秒 time.time 内每 3 秒执行一次。 最后它给出了一个错误,我认为这是因为 threading.timer 没有被取消。
如何取消?
提前致谢!
【问题讨论】:
您的程序在产生第一个计时器后终止。 为了取消定时器,您需要在某处保留对它的引用,并调用其cancel
方法。
请注意,到目前为止发布的所有解决方案都存在(小)累积错误。也就是说,它们不是根据固定的开始时间计算下一个睡眠持续时间,而是简单地睡“另外 60 秒”。这绝不会睡不到60s,通常会睡60s多一点。这并不总是一个问题,但如果您想在一天内准确地睡 1440 次,请确保补偿此错误。
这在我的解决方案中不是问题。感谢您指出这一点!
相关:setTimeout()
, setInterval()
in Python. Examples for tkinter, twisted, gtk
【参考方案1】:
你可以试试 threading.Timer 类:http://docs.python.org/library/threading.html#timer-objects。
import threading
def f(f_stop):
# do something here ...
if not f_stop.is_set():
# call f() again in 60 seconds
threading.Timer(60, f, [f_stop]).start()
f_stop = threading.Event()
# start calling f now and every 60 sec thereafter
f(f_stop)
# stop the thread when needed
#f_stop.set()
【讨论】:
该代码只执行一次,是否缺少执行更多次的内容? 糟糕,感谢您添加 .start() ...我应该从我的解释器中复制/粘贴 :)。现在它应该真正每 60 秒调用一次,而不仅仅是一次。 程序执行结束后如何取消threading.Timer? 如果程序正在结束,那么调用 sys.exit(0) 也会导致线程终止。或者,您可以使用 f() 读取的外部布尔变量。然后 f() 可以根据它的值决定是否创建另一个计时器。 sys.exit 会更直接(因为它会立即停止另一个线程,而不是在 f() 再次运行之前等待另一个时间间隔)。 @DavidUnderhill @aF。感谢您的帮助。我遇到了同样的问题,尽管我结束了程序,但函数仍在运行。sys.exit(0)
在我的代码中应该放在哪里?在函数体中,还是在调用函数之后——在这种情况下是 f()
行?【参考方案2】:
最简单的方法是创建一个每 60 秒运行一次的后台线程。一个简单的实现是:
class BackgroundTimer(Thread):
def run(self):
while 1:
Time.sleep(60)
# do something
# ... SNIP ...
# Inside your main thread
# ... SNIP ...
timer = BackgroundTimer()
timer.start()
显然,如果“做某事”需要很长时间,您需要在睡眠声明中适应它。但这是一个很好的近似值。
【讨论】:
我希望你意识到空的__init__
方法是多余的:)
独立于执行时间的方法是使用实时时钟(比如time.time()
或datetime.now()
),检查任务执行开始的时间,加上60秒得到下一次发射的时间,然后在最后再次检查并从下一次发射时间中减去它以获得您的睡眠时间。【参考方案3】:
我搜索了一下,发现了 Python circuits 框架,这使得等待成为可能 对于特定事件。
.callEvent(self, event, *channels)
电路方法包含触发和暂停直到响应功能,文档说:
将给定事件触发到指定通道并暂停执行 直到它被发送出去。此方法只能作为 处理程序最高执行级别上的
yield
的参数(例如 “yield self.callEvent(event)
”)。它有效地创造和回报 将由主循环调用的生成器,直到事件发生 已发送(参见 :func:circuits.core.handlers.handler
)。
我希望你发现它和我一样有用:) ./问候
【讨论】:
【参考方案4】:这取决于您在此期间实际想要做什么。线程是最通用和最不受欢迎的方式。使用线程时,您应该注意线程问题:并非所有(非 Python)代码都允许同时从多个线程访问,线程之间的通信应该使用线程安全的数据结构(如 Queue.Queue
)来完成,您不会能够从外部中断线程,并在线程仍在运行时终止程序可能会导致解释器挂起或虚假回溯。
通常有更简单的方法。如果您在 GUI 程序中执行此操作,请使用 GUI 库的计时器或事件功能。所有的 GUI 都有这个。同样,如果您正在使用另一个事件系统,例如 Twisted 或另一个服务器进程模型,您应该能够挂钩到主事件循环以使其定期调用您的函数。非线程方法确实会导致您的程序在函数挂起时被阻塞,但不会在函数调用之间阻塞。
【讨论】:
在 GUI 计时器上执行此操作的陷阱是,当您处理超时时,GUI 会冻结。如果任务很短,没问题。如果不是,那么您的程序似乎每分钟都挂起。【参考方案5】:为什么不创建一个专用线程,在其中放置一个简单的睡眠循环:
#!/usr/bin/env python
import time
while True:
# Your code here
time.sleep(60)
【讨论】:
不,但就是这样。当我回答时,没有代码,所以这只是一个想法。 @MikeGraham 你能解释一下为什么它不是 Python 吗? @Dunatotatos 当时的修订版***.com/revisions/2223178/2 语法不正确。【参考方案6】:我认为重复运行线程的正确方法是下一个:
import threading
import time
def f():
print("hello world") # your code here
myThread.run()
if __name__ == '__main__':
myThread = threading.Timer(3, f) # timer is set to 3 seconds
myThread.start()
time.sleep(10) # it can be loop or other time consuming code here
if myThread.is_alive():
myThread.cancel()
使用此代码,函数 f 在 10 秒 time.sleep(10) 内每 3 秒执行一次。在线程的最后运行被取消。
【讨论】:
【参考方案7】:如果您想“按时”调用方法(例如,每小时按时),您可以将以下想法与您选择的任何线程机制相结合:
import time
def wait(n):
'''Wait until the next increment of n seconds'''
x = time.time()
time.sleep(n-(x%n))
print(time.asctime())
【讨论】:
【参考方案8】:[截图。删除了非异步版本]
要使用异步,您可以使用 trio。我向所有询问异步 python 的人推荐 trio。使用特别是套接字要容易得多。使用套接字,我有一个具有 1 个读取和 1 个写入功能的托儿所,写入功能从读取功能放置的双端队列写入数据;并等待发送。以下应用程序通过使用trio.run(function,parameters)
工作,然后打开一个程序在循环中运行的托儿所,每个循环之间有一个await trio.sleep(60)
,以使应用程序的其余部分有机会运行。这将在单个进程中运行程序,但您的机器可以处理 1500 个 TCP 连接,而使用非异步方法只能处理 255 个。
我还没有掌握取消语句,但我输入了move_on_after(70)
,这意味着代码将等待 10 秒,而不是执行 60 秒的睡眠,然后再进入下一个循环。
import trio
async def execTimer():
'''This function gets executed in a nursery simultaneously with the rest of the program'''
while True:
trio.move_on_after(70):
await trio.sleep(60)
print('60 Second Loop')
async def OneTime_OneMinute():
'''This functions gets run by trio.run to start the entire program'''
with trio.open_nursery() as nursery:
nursery.start_soon(execTimer)
nursery.start_soon(print,'do the rest of the program simultaneously')
def start():
'''You many have only one trio.run in the entire application'''
trio.run(OneTime_OneMinute)
if __name__ == '__main__':
start()
这将在托儿所中同时运行任意数量的功能。您可以将任何可取消语句用于程序其余部分继续运行的检查点。所有 trio 语句都是检查点,所以经常使用它们。我没有测试这个应用程序;所以如果有任何问题就问吧。 如您所见,三重奏是易于使用的功能的冠军。它基于使用函数而不是对象,但您可以根据需要使用对象。 阅读更多: [1]:https://trio.readthedocs.io/en/stable/reference-core.html
【讨论】:
以上是关于如何在 Python 中每 60 秒异步执行一次函数?的主要内容,如果未能解决你的问题,请参考以下文章