如何使用调度库运行异步函数?

Posted

技术标签:

【中文标题】如何使用调度库运行异步函数?【英文标题】:How can I run an async function using the schedule library? 【发布时间】:2019-01-02 22:17:49 【问题描述】:

我正在使用 discord.py rewrite 编写一个不和谐机器人,我想每天在特定时间运行一个函数。我根本没有使用异步函数的经验,而且我不知道如何在不使用“await”的情况下运行一个函数。这只是我的一部分代码,这就是为什么有些东西可能没有定义的原因。

async def send_channel():
    try:
        await active_channel.send('daily text here')
    except Exception:
        active_channel_id = None
        active_channel = None

async def timer():
    while True:
        schedule.run_pending()
        await asyncio.sleep(3)
        schedule.every().day.at("21:57").do(await send_channel())

@bot.event
async def on_ready():
    print("Logged in as")
    print(bot.user.name)
    print(bot.user.id)
    print("------")

    bot.loop.create_task(timer())

使用schedule.every().day.at("00:00").do() 函数,当我将await send_channel() 放入.do() 的参数中时出现此错误:

self.job_func = functools.partial(job_func, *args, **kwargs) TypeError:第一个参数必须是可调用的

但是当我不使用等待时,我只有send_channel() 作为参数,我得到这个错误:

RuntimeWarning:从未等待协程“send_channel”

我不是特别擅长编程,所以如果有人可以尝试为我简化它,那就太棒了。

谢谢

【问题讨论】:

【参考方案1】:

你正在做的事情不起作用,因为do 需要一个函数(或另一个可调用的),但你试图await 或调用一个函数,然后传递它结果。

await send_channel() 阻塞直到发送完成,然后给你None,这不是一个函数。 send_channel() 返回一个协程,您可以稍后等待它做一些工作,这也不是一个函数。

如果你只传递了send_channel,那是一个函数,但它是一个ascynio 协程函数,schedule 不知道如何运行。


此外,与其尝试将schedule 集成到asyncio 事件循环中,并弄清楚如何将异步作业包装为schedule 任务,反之亦然等等,不如只提供@ 987654335@自己的线程。

There's a FAQ entry on this:

如何在不阻塞主线程的情况下连续运行调度器?

在单独的线程中运行调度程序。 Mrwick 为这个问题写了一个很好的解决方案here(查找 run_continuously())。

基本思想很简单。将您的 timer 函数更改为:

schedstop = threading.Event()
def timer():
    while not schedstop.is_set():
        schedule.run_pending()
        time.sleep(3)
schedthread = threading.Thread(target=timer)
schedthread.start()

在程序开始时执行此操作,甚至在您开始 asyncio 事件循环之前。

在退出时,停止调度线程:

schedstop.set()

现在,要添加任务,无论您是在***代码中,还是在异步协程中,还是在 scheduler 任务中,都可以这样添加:

schedule.every().day.at("21:57").do(task)

现在,回到您的第一个问题。您要运行的任务不是普通函数,它是一个 asyncio 协程,它必须作为主事件循环的一部分在主线程上运行。

但这正是call_soon_threadsafe 的用途。你要调用的是:

bot.loop.call_soon_threadsafe(send_channel)

要让scheduler 运行它,您只需将bot.loop.call_soon_threadsafe 作为函数传递,send_channel 作为参数传递。

所以,把它们放在一起:

schedule.every().day.at("21:57").do(
    bot.loop.call_soon_threadsafe, send_channel)

【讨论】:

穿线部分有必要吗?因为我无法让它工作而且我对线程不是很熟悉(我知道我应该这样做。)就“schedule.every().day.at("21:57").do(bot .loop.call_soon_threadsafe, send_channel)”,那部分对我也不起作用。我仍然收到错误,从未等待协程'send_channel'。而且我认为 send_channel() 函数必须异步才能工作。【参考方案2】:

这是一个老问题,但我最近遇到了同样的问题。您可以使用run_coroutine_threadsafe 将协程安排到事件循环(而不是回调):

asyncio.run_coroutine_threadsafe(async_function(), bot.loop)

【讨论】:

【参考方案3】:

另一种选择是使用apscheduler 的Asyncioscheduler,它更自然地与异步函数(例如send_channel)一起工作。在您的情况下,您可以简单地编写以下形式:

scheduler = AsyncIOScheduler()
scheduler.add_job(send_channel, trigger=tr)
scheduler.start()

其中tr 是一个触发器对象。您可以使用 IntervalTrigger(间隔 1 天,开始日期为 21:57),也可以使用 CronTrigger

请注意,建议在程序结束时调用调度程序对象上的shutdown()

【讨论】:

【参考方案4】:

discord.py 中的内置解决方案是使用 discord.ext.tasks 扩展。这使您可以注册要在特定时间间隔重复调用的任务。当机器人启动时,我们会将循环的启动延迟到目标时间,然后每 24 小时运行一次任务:

import asyncio
from discord.ext import commands, tasks
from datetime import datetime, timedelta

bot = commands.Bot("!")

@tasks.loop(hours=24)
async def my_task():
    ...

@my_task.before_loop
async def before_my_task():
    hour = 21
    minute = 57
    await bot.wait_until_ready()
    now = datetime.now()
    future = datetime.datetime(now.year, now.month, now.day, hour, minute)
    if now.hour >= hour and now.minute > minute:
        future += timedelta(days=1)
    await asyncio.sleep((future-now).seconds)

my_task.start()

【讨论】:

【参考方案5】:

经历了同样的问题,我找到了一个混合了以前的一些解决方案的解决方案:

import schedule
from discord.ext import tasks

@tasks.loop(hours=24)
async def send_channel():
    pass

然后在我定义的主线程之前

def event_starter(func):
    if not func.is_running():
        func.start()

schedstop = threading.Event()
def timer():
    while not schedstop.is_set():
        schedule.run_pending()
        sleep(1)
schedthread = threading.Thread(target=timer)
schedthread.start()

最后,在主线程中:

if __name__ == "__main__":

    ...

    schedule.every().day.at('21:57:00').do(event_starter, send_channel)

    ...

【讨论】:

以上是关于如何使用调度库运行异步函数?的主要内容,如果未能解决你的问题,请参考以下文章

如何在异步函数中调用 redux 调度作为函数体?

Redux thunk:等待异步函数调度

如何序列化异步pthreads代码段

详解异步任务 | 看 Serverless Task 如何解决任务调度&可观测性中的问题

如何使用不同的参数异步运行很多函数?

如何在同一线程上调度异步以进行串行处理