在 Python 开发中,经常需要执行一些定时任务,本文对比了
APScheduler、Celery、Celery
三个定时库,最终选择 APScheduler 作为定时器框架。
本文主要介绍如何在 FastApi
中集成异步的定时任务,同时列出自己配置过程中遇到的坑点。
 
测试代码 
可以将下列代码保存为 .py 文件,运行测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import  asynciofrom  apscheduler.schedulers.asyncio import  AsyncIOSchedulerasync  def  async_job ():    print ("这是一个异步任务" )     await  asyncio.sleep(1 )   async  def  main ():    scheduler = AsyncIOScheduler()     scheduler.add_job(async_job, "interval" , seconds=3 )     scheduler.start()          try :         await  asyncio.sleep(float ("inf" ))     except  (KeyboardInterrupt, SystemExit):         pass  asyncio.run(main()) 
 
FastAPI 集成 
为了更好的复用,在 python 中,对 schedule 进一步乾抽象,UML
图如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 classDiagram     class BaseScheduleJob {         +job_id: str         +start()         +run_async()     }     class BaseIntervalScheduleJob {         +interval: int         +start()     }     class BaseCronScheduleJob {         +corn: str         +start()             }     class IntervalTestScheduleJob {         +run_async()     }          BaseScheduleJob <|-- BaseIntervalScheduleJob : inherits     BaseScheduleJob <|-- BaseCronScheduleJob : inherits     BaseIntervalScheduleJob <|-- IntervalTestScheduleJob : inherits 
 
在启动时,注入生命周期函数,在该函数中启动调度器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @asynccontextmanager async  def  lifespan (app: FastAPI ):    """      生命周期管理     """     logger.info("应用生命周期启动..." )               from  app.schedule.scheduler import  use_scheduler     logger.info("初始化定时任务调度器..." )     use_scheduler()     logger.info("定时任务调度器初始化完成!" )                    yield           logger.info("应用生命周期结束..." ) app = FastAPI(lifespan=lifespan) 
 
scheduler 初始化的代码节选如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 jobstores = {"default" : MongoDBJobStore(client=pymongo_client)} job_defaults = {"coalesce" : False , "max_instances" : 5 } scheduler = AsyncIOScheduler(     jobstores=jobstores,          job_defaults=job_defaults,     timezone=datetime.timezone.utc, ) 
 
遇到的坑 
设置 ThreadPoolExecutor 后,提示 never
awaited,导致计划不执行 
1 2 RuntimeWarning: coroutine 'BaseScheduleJob.run_async' was never awaited   del work_item 
 
使用 AsyncIOScheduler 时不应设置 executors
持久化到数据库后,内部状态无法持久保存 
当将计划持久到数据库后,对象类中的状态无法被更新,应将触发的回调当做静态函数使用。
猜测是因为每次执行时,都会从数据库中重新实例化对象。
参考 
本文参考以下文章,在此致以诚挚谢意!
Python定时任务-schedule
vs. Celery vs. APScheduler 比较_python
shced和apscheduler区别-CSDN博客