詳解Python使用apscheduler定時(shí)執(zhí)行任務(wù)
apscheduler 的使用
我們項(xiàng)目中總是避免不了要使用一些定時(shí)任務(wù),比如說(shuō)最近的項(xiàng)目,用戶點(diǎn)擊報(bào)名考試以后需要在考試日期臨近的時(shí)候推送小程序消息提醒到客戶微信上,翻了翻 fastapi 中的實(shí)現(xiàn),雖然方法和包也不少,但是要不就是太重了(比如需要再開(kāi)服務(wù),還要依賴 redis,都不好用),雖然也可以使用 time 模塊的 time.sleep()機(jī)上 fastapi 的后臺(tái)任務(wù)變相實(shí)現(xiàn),但是相對(duì)簡(jiǎn)單的功能還行,復(fù)雜點(diǎn)的代碼起來(lái)就麻煩了,所以還是專人專事找個(gè)負(fù)責(zé)這個(gè)額的包吧。找來(lái)找去發(fā)現(xiàn) APScheduler 就挺適合,代碼簡(jiǎn)單,實(shí)現(xiàn)效果也很好,這里做個(gè)記錄!
安裝
pip install apscheduler
主要組成部分
概念性東西,混個(gè)臉熟,代碼比這些定義好理解。
觸發(fā)器(trigger)包含調(diào)度邏輯,每一個(gè)作業(yè)有它自己的觸發(fā)器,用于決定接下來(lái)哪一個(gè)作業(yè)會(huì)運(yùn)行。除了他們自己初始配置意外,觸發(fā)器完全是無(wú)狀態(tài)的。說(shuō)人話就是你指定那種方式觸發(fā)當(dāng)前的任務(wù)。
類型 | 解釋 |
DateTrigger | 到期執(zhí)行(到xxxx年x月x日 x時(shí)x分x秒執(zhí)行) 對(duì)應(yīng)DateTrigger |
IntervalTrigger | 間隔執(zhí)行(每5秒執(zhí)行一次) |
CronTrigger | 一個(gè)crontab類型的條件(這個(gè)比較復(fù)雜,比如周一到周四的4-5點(diǎn)每5秒執(zhí)行一次) |
作業(yè)存儲(chǔ)(job store)存儲(chǔ)被調(diào)度的作業(yè),默認(rèn)的作業(yè)存儲(chǔ)是簡(jiǎn)單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲(chǔ)是將作業(yè)保存在數(shù)據(jù)庫(kù)中。一個(gè)作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲(chǔ)時(shí)被序列化,并在加載時(shí)被反序列化。調(diào)度器不能分享同一個(gè)作業(yè)存儲(chǔ)。
Jobstore在scheduler中初始化,另外也可通過(guò)scheduler的add_jobstore動(dòng)態(tài)添加Jobstore。每個(gè)jobstore
都會(huì)綁定一個(gè)alias,scheduler在Add Job時(shí),根據(jù)指定的jobstore在scheduler中找到相應(yīng)的jobstore,并
將job添加到j(luò)obstore中。
Jobstore主要是通過(guò)pickle庫(kù)的loads和dumps【實(shí)現(xiàn)核心是通過(guò)python的__getstate__和__setstate__重寫(xiě)
實(shí)現(xiàn)】,每次變更時(shí)將Job動(dòng)態(tài)保存到存儲(chǔ)中,使用時(shí)再動(dòng)態(tài)的加載出來(lái),作為存儲(chǔ)的可以是redis,也可以
是數(shù)據(jù)庫(kù)【通過(guò)sqlarchemy這個(gè)庫(kù)集成多種數(shù)據(jù)庫(kù)】,也可以是mongodb等
目前APScheduler支持的Jobstore:
MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore
執(zhí)行器(executor)處理作業(yè)的運(yùn)行,他們通常通過(guò)在作業(yè)中提交制定的可調(diào)用對(duì)象到一個(gè)線程或者進(jìn)城池來(lái)進(jìn)行。當(dāng)作業(yè)完成時(shí),執(zhí)行器將會(huì)通知調(diào)度器。
- 說(shuō)人話就是添加任務(wù)時(shí)候用它來(lái)包裝的,executor的種類會(huì)根據(jù)不同的調(diào)度來(lái)選擇,如果選擇AsyncIO作為調(diào)度的庫(kù),那么選擇AsyncIOExecutor,如果選擇tornado作為調(diào)度的庫(kù),選擇TornadoExecutor,如果選擇啟動(dòng)進(jìn)程作為調(diào)度,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的選擇需要根據(jù)實(shí)際的scheduler來(lái)選擇不同的執(zhí)行器
目前APScheduler支持的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor
調(diào)度器(scheduler)是其他的組成部分。你通常在應(yīng)用只有一個(gè)調(diào)度器,應(yīng)用的開(kāi)發(fā)者通常不會(huì)直接處理作業(yè)存儲(chǔ)、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲(chǔ)和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè).
Scheduler是APScheduler的核心,所有相關(guān)組件通過(guò)其定義。scheduler啟動(dòng)之后,將開(kāi)始按照配置的任務(wù)進(jìn)行調(diào)度。
除了依據(jù)所有定義Job的trigger生成的將要調(diào)度時(shí)間喚醒調(diào)度之外。當(dāng)發(fā)生Job信息變更時(shí)也會(huì)觸發(fā)調(diào)度。
scheduler可根據(jù)自身的需求選擇不同的組件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則
選擇TornadoScheduler。
目前APScheduler支持的Scheduler:
AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler
簡(jiǎn)單應(yīng)用
import time from apscheduler.schedulers.blocking import BlockingScheduler # 引入后臺(tái) def my_job(): print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sched = BlockingScheduler() sched.add_job(my_job, 'interval', seconds=5) sched.start()
完整代碼
# trigeers 觸發(fā)器 # job stores job 存儲(chǔ) # executors 執(zhí)行器 # schedulers 調(diào)度器 from pytz import utc from sqlalchemy import func from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { # 可以配置多個(gè)存儲(chǔ) #'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存儲(chǔ)鏈接 } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工作線程數(shù)20 'processpool': ProcessPoolExecutor(max_workers=5) # 最大工作進(jìn)程數(shù)為5 } job_defaults = { 'coalesce': False, # 關(guān)閉新job的合并,當(dāng)job延誤或者異常原因未執(zhí)行時(shí) 'max_instances': 3 # 并發(fā)運(yùn)行新job默認(rèn)最大實(shí)例多少 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc. scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作為調(diào)度程序的時(shí)區(qū) import os import time def print_time(name): print(f'{name} - {time.ctime()}') def add_job(job_id, func, args, seconds): """添加job""" print(f"添加間隔執(zhí)行任務(wù)job - {job_id}") scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds) def add_coun_job(job_id, func, args, start_time): """添加job""" print(f"添加一次執(zhí)行任務(wù)job - {job_id}") scheduler.add_job(id=job_id, func=func, args=args, trigger='date',timezone='Asia/Shanghai', run_date=start_time) # scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2']) def remove_job(job_id): """移除job""" scheduler.remove_job(job_id) print(f"移除job - {job_id}") def pause_job(job_id): """停止job""" scheduler.pause_job(job_id) print(f"停止job - {job_id}") def resume_job(job_id): """恢復(fù)job""" scheduler.resume_job(job_id) print(f"恢復(fù)job - {job_id}") def get_jobs(): """獲取所有job信息,包括已停止的""" res = scheduler.get_jobs() print(f"所有job - {res}") def print_jobs(): print(f"詳細(xì)job信息") scheduler.print_jobs() def start(): """啟動(dòng)調(diào)度器""" scheduler.start() def shutdown(): """關(guān)閉調(diào)度器""" scheduler.shutdown() if __name__ == '__main__': scheduler = BackgroundScheduler() # start() # print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C')) # add_job('job_A', func=print_time, args=("A", ), seconds=1) # add_job('job_B', func=print_time, args=("B", ), seconds=2) # time.sleep(6) # pause_job('job_A') # 停止a # get_jobs() #得到所有job # time.sleep(6) # print_jobs() # resume_job('job_A') # time.sleep(6) # remove_job('job_A') # time.sleep(6) from datetime import datetime import pytz start() date_temp = datetime(2022, 2, 19, 17, 30, 5) # scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text']) # scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2']) add_coun_job(job_id="job_C",func=print_time,args=('一次性執(zhí)行任務(wù)',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone()) time.sleep(130) try: shutdown() except RuntimeError: pass
到此這篇關(guān)于詳解Python使用apscheduler定時(shí)執(zhí)行任務(wù)的文章就介紹到這了,更多相關(guān)Python apscheduler內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python Socket 編程知識(shí)點(diǎn)詳細(xì)介紹
這篇文章主要介紹了Python Socket 編程,Socket又稱為套接字,它是所有網(wǎng)絡(luò)通信的基礎(chǔ)。網(wǎng)絡(luò)通信其實(shí)就是進(jìn)程間的通信,Socket主要是使用IP地址,協(xié)議,端口號(hào)來(lái)標(biāo)識(shí)一個(gè)進(jìn)程,下文詳細(xì)內(nèi)容,需要的小伙伴可以參考一下2022-02-02Python標(biāo)準(zhǔn)庫(kù)os常用函數(shù)和屬性詳解
os模塊是Python標(biāo)準(zhǔn)庫(kù)中的一個(gè)用于訪問(wèn)操作系統(tǒng)相關(guān)功能的模塊,os模塊提供了一種可移植的使用操作系統(tǒng)功能的方法,本文給大家介紹下?OS標(biāo)準(zhǔn)庫(kù)常用函數(shù)和屬性,感興趣的朋友跟隨小編一起看看吧2022-11-11python中文件變化監(jiān)控示例(watchdog)
這篇文章主要介紹了python中文件變化監(jiān)控示例(watchdog),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-10-10僅用500行Python代碼實(shí)現(xiàn)一個(gè)英文解析器的教程
這篇文章主要介紹了僅用500行Python代碼實(shí)現(xiàn)一個(gè)英文解析器的教程,自然語(yǔ)言處理近來(lái)也是業(yè)界中一個(gè)熱門課題,作者為NLP方向的開(kāi)發(fā)者,需要的朋友可以參考下2015-04-04Python paramiko 模塊淺談與SSH主要功能模擬解析
這篇文章主要介紹了Python paramiko 模塊詳解與SSH主要功能模擬,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02python獲取局域網(wǎng)占帶寬最大3個(gè)ip的方法
這篇文章主要介紹了python獲取局域網(wǎng)占帶寬最大3個(gè)ip的方法,涉及Python解析URL參數(shù)的相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-07-07