Python 中使用 APScheduler 配置异步任务

在 Python 开发中,经常需要执行一些定时任务,本文对比了 APSchedulerCeleryCelery 三个定时库,最终选择 APScheduler 作为定时器框架。

本文主要介绍如何在 FastApi 中集成异步的定时任务,同时列出自己配置过程中遇到的坑点。

测试代码

可以将下列代码保存为 .py 文件,运行测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 需要提前安装  pip install apscheduler
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler


async def async_job():
print("这是一个异步任务")
await asyncio.sleep(1) # 模拟异步IO操作


async def main():
scheduler = AsyncIOScheduler()
scheduler.add_job(async_job, "interval", seconds=3)
scheduler.start()

# 保持程序运行,直到中断
try:
await asyncio.sleep(float("inf"))
except (KeyboardInterrupt, SystemExit):
pass


# 使用 asyncio.run 启动异步主函数
asyncio.run(main())

FastAPI 集成

为了更好的复用,在 python 中,对 schedule 进一步乾抽象,UML 图如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
classDiagram
class BaseScheduleJob {
+job_id: str
+start()
+run_async()
}

class BaseIntervalScheduleJob {
+interval: int
+start()
}

class BaseCronScheduleJob {
+corn: str
+start()
}

class IntervalTestScheduleJob {
+run_async()
}

BaseScheduleJob <|-- BaseIntervalScheduleJob : inherits
BaseScheduleJob <|-- BaseCronScheduleJob : inherits
BaseIntervalScheduleJob <|-- IntervalTestScheduleJob : inherits

在启动时,注入生命周期函数,在该函数中启动调度器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
生命周期管理
"""
logger.info("应用生命周期启动...")

# region 定时任务
# 在这里可以放置应用启动时需要执行的代码
from app.schedule.scheduler import use_scheduler

logger.info("初始化定时任务调度器...")
use_scheduler()
logger.info("定时任务调度器初始化完成!")
# endregion

# yield 之前的代码(如加载 ML 模型)在上下文进入时执行,即应用启动时。
# yield 之后的代码(如清理模型)在上下文退出时执行,即应用关闭时。
yield
# Clean up the ML models and release the resources
logger.info("应用生命周期结束...")


app = FastAPI(lifespan=lifespan)

scheduler 初始化的代码节选如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 用于持久化的设置,使用MongoDB
# client 用于设置你自己的 MongoDB 的 handler, 即 MongoClient对象
# database 为 apscheduler
jobstores = {"default": MongoDBJobStore(client=pymongo_client)}

# 要特别注意,executors 只能在 BackgroundScheduler 中使用,在 AsyncIOScheduler 不要配置,会报错
# executors = {"default": ThreadPoolExecutor()}
job_defaults = {"coalesce": False, "max_instances": 5}

# 这里使用 BackgroundScheduler 即可
scheduler = AsyncIOScheduler(
jobstores=jobstores,
# executors=executors,
job_defaults=job_defaults,
timezone=datetime.timezone.utc,
)

遇到的坑

设置 ThreadPoolExecutor 后,提示 never awaited,导致计划不执行

1
2
RuntimeWarning: coroutine 'BaseScheduleJob.run_async' was never awaited
del work_item

使用 AsyncIOScheduler 时不应设置 executors

持久化到数据库后,内部状态无法持久保存

当将计划持久到数据库后,对象类中的状态无法被更新,应将触发的回调当做静态函数使用。

猜测是因为每次执行时,都会从数据库中重新实例化对象。

参考

本文参考以下文章,在此致以诚挚谢意!

  1. Python定时任务-schedule vs. Celery vs. APScheduler 比较_python shced和apscheduler区别-CSDN博客