Python任務(wù)調(diào)度利器之APScheduler詳解
任務(wù)調(diào)度應(yīng)用場景
所謂的任務(wù)調(diào)度是指安排任務(wù)的執(zhí)行計劃,即何時執(zhí)行,怎么執(zhí)行等。在現(xiàn)實(shí)項(xiàng)目中經(jīng)常出現(xiàn)它們的身影;特別是數(shù)據(jù)類項(xiàng)目,比如實(shí)時統(tǒng)計每5分鐘網(wǎng)站的訪問量,就需要每5分鐘定時從日志數(shù)據(jù)分析訪問量。
總結(jié)下任務(wù)調(diào)度應(yīng)用場景:
- 離線作業(yè)調(diào)度:按時間粒度執(zhí)行某項(xiàng)任務(wù)
- 共享緩存更新:定時刷新緩存,如redis緩存;不同進(jìn)程間的共享數(shù)據(jù)
任務(wù)調(diào)度工具
- linux的crontab, 支持按照分鐘/小時/天/月/周粒度,執(zhí)行任務(wù)
- java的Quartz
- windows的任務(wù)計劃
本文介紹的是python中的任務(wù)調(diào)度庫,APScheduler(advance python scheduler)。如果你了解Quartz的話,可以看出APScheduler是Quartz的python實(shí)現(xiàn);APScheduler提供了基于時間,固定時間點(diǎn)和crontab方式的任務(wù)調(diào)用方案, 可以當(dāng)作一個跨平臺的調(diào)度工具來使用。
APScheduler
組件介紹
APScheduler由5個部分組成:觸發(fā)器、調(diào)度器、任務(wù)存儲器、執(zhí)行器和任務(wù)事件。
- 任務(wù)job:任務(wù)id和任務(wù)執(zhí)行func
- 觸發(fā)器triggers:確定任務(wù)何時開始執(zhí)行
- 任務(wù)存儲器job stores: 保存任務(wù)的狀態(tài)
- 執(zhí)行器executors:確定任務(wù)怎么執(zhí)行
- 任務(wù)事件event:監(jiān)控任務(wù)執(zhí)行異常情況
- 調(diào)度器schedulers:串聯(lián)任務(wù)的整個生命周期,添加編輯任務(wù)到任務(wù)存儲器,在任務(wù)的執(zhí)行時間到來時,把任務(wù)交給執(zhí)行器執(zhí)行返回結(jié)果;同時發(fā)出事件監(jiān)聽,監(jiān)控任務(wù)事件 。
安裝
pip install apscheduler
簡單例子
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
import datetime
# 任務(wù)執(zhí)行函數(shù)
def job_func(job_id):
print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 事件監(jiān)聽
def job_exception_listener(event):
if event.exception:
# todo:異常處理, 告警等
print('The job crashed :(')
else:
print('The job worked :)')
# 日志
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
# 定義一個后臺任務(wù)非阻塞調(diào)度器
scheduler = BackgroundScheduler()
# 添加一個任務(wù)到內(nèi)存中
# 觸發(fā)器:trigger='interval' seconds=10 每10s觸發(fā)執(zhí)行一次
# 執(zhí)行器:executor='default' 線程執(zhí)行
# 任務(wù)存儲器:jobstore='default' 默認(rèn)內(nèi)存存儲
# 最大并發(fā)數(shù):max_instances
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
# 設(shè)置任務(wù)監(jiān)聽
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 啟動調(diào)度器
scheduler.start()
運(yùn)行情況:
job 1 is runed at 2020-03-21 20:00:38
The job worked :)
job 1 is runed at 2020-03-21 20:00:48
The job worked :)
job 1 is runed at 2020-03-21 20:00:58
The job worked :)
觸發(fā)器
觸發(fā)器決定何時執(zhí)行任務(wù),APScheduler支持的觸發(fā)器有3種
trigger='interval':按固定時間周期執(zhí)行,支持weeks,days,hours,minutes, seconds, 還可指定時間范圍
sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
trigger='date': 固定時間,執(zhí)行一次
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
trigger='cron': 支持crontab方式,執(zhí)行任務(wù)
參數(shù):分鐘/小時/天/月/周粒度,也可指定時間范圍
year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
例子
# 星期一到星期五,5點(diǎn)30執(zhí)行任務(wù)job_function,直到2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 按照crontab格式執(zhí)行, 格式為:分鐘 小時 天 月 周,*表示所有
# 5月到8月的1號到15號,0點(diǎn)0分執(zhí)行任務(wù)job_function
sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
執(zhí)行器
執(zhí)行器決定如何執(zhí)行任務(wù);APScheduler支持4種不同執(zhí)行器,常用的有pool(線程/進(jìn)程)和gevent(io多路復(fù)用,支持高并發(fā)),默認(rèn)為pool中線程池, 不同的執(zhí)行器可以在調(diào)度器的配置中進(jìn)行配置(見調(diào)度器)
- apscheduler.executors.asyncio:同步io,阻塞
- apscheduler.executors.gevent:io多路復(fù)用,非阻塞
- apscheduler.executors.pool: 線程ThreadPoolExecutor和進(jìn)程ProcessPoolExecutor
- apscheduler.executors.twisted:基于事件驅(qū)動
任務(wù)存儲器
任務(wù)存儲器決定任務(wù)的保存方式, 默認(rèn)存儲在內(nèi)存中(MemoryJobStore),重啟后就沒有了。APScheduler支持的任務(wù)存儲器有:
- apscheduler.jobstores.memory:內(nèi)存
- apscheduler.jobstores.mongodb:存儲在mongodb
- apscheduler.jobstores.redis:存儲在redis
- apscheduler.jobstores.rethinkdb:存儲在rethinkdb
- apscheduler.jobstores.sqlalchemy:支持sqlalchemy的數(shù)據(jù)庫如mysql,sqlite等
- apscheduler.jobstores.zookeeper:zookeeper
不同的任務(wù)存儲器可以在調(diào)度器的配置中進(jìn)行配置(見調(diào)度器)
調(diào)度器
APScheduler支持的調(diào)度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler
- BlockingScheduler:適用于調(diào)度程序是進(jìn)程中唯一運(yùn)行的進(jìn)程,調(diào)用start函數(shù)會阻塞當(dāng)前線程,不能立即返回。
- BackgroundScheduler:適用于調(diào)度程序在應(yīng)用程序的后臺運(yùn)行,調(diào)用start后主線程不會阻塞。
- AsyncIOScheduler:適用于使用了asyncio模塊的應(yīng)用程序。
- GeventScheduler:適用于使用gevent模塊的應(yīng)用程序。
- TwistedScheduler:適用于構(gòu)建Twisted的應(yīng)用程序。
- QtScheduler:適用于構(gòu)建Qt的應(yīng)用程序。
從前面的例子,我們可以看到,調(diào)度器可以操作任務(wù)(并為任務(wù)指定觸發(fā)器、任務(wù)存儲器和執(zhí)行器)和監(jiān)控任務(wù)。
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
我們來詳細(xì)看下各個部分
調(diào)度器配置:在add_job我們看到j(luò)obstore和executor都是default,APScheduler在定義調(diào)度器時可以指定不同的任務(wù)存儲和執(zhí)行器,以及初始的參數(shù)
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 通過dict方式執(zhí)行不同的jobstores、executors和默認(rèn)的參數(shù)
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# 定義調(diào)度器
scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc)
def job_func(job_id):
print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 添加任務(wù)
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)
# 啟動調(diào)度器
scheduler.start()
操作任務(wù):調(diào)度器可以增加,刪除,暫停,恢復(fù)和修改任務(wù)。需要注意的是這里的操作只是對未執(zhí)行的任務(wù)起作用,已經(jīng)執(zhí)行和正在執(zhí)行的任務(wù)不受這些操作的影響。
add_job
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
remove_job: 通過任務(wù)唯一的id,刪除的時候?qū)?yīng)的任務(wù)存儲器里記錄也會刪除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
Pausing and resuming jobs:暫停和重啟任務(wù)
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
Modifying jobs:修改任務(wù)的配置
job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)
# 修改任務(wù)的屬性
job.modify(max_instances=6, name='Alternate name')
# 修改任務(wù)的觸發(fā)器
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
監(jiān)控任務(wù)事件類型,比較常用的類型有:
- EVENT_JOB_ERROR: 表示任務(wù)在執(zhí)行過程的出現(xiàn)異常觸發(fā)
- EVENT_JOB_EXECUTED:任務(wù)執(zhí)行成功時
- EVENT_JOB_MAX_INSTANCES:調(diào)度器上執(zhí)行的任務(wù)超過配置的參數(shù)時
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
總結(jié)
到此這篇關(guān)于Python任務(wù)調(diào)度利器之APScheduler詳解的文章就介紹到這了,更多相關(guān)python任務(wù)調(diào)度 APScheduler內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python中使用sqlalchemy操作數(shù)據(jù)庫的問題總結(jié)
在探索使用?FastAPI,?SQLAlchemy,?Pydantic,Redis,?JWT?構(gòu)建的項(xiàng)目的時候,其中數(shù)據(jù)庫訪問采用SQLAlchemy,并采用異步方式,這篇文章主要介紹了在Python中使用sqlalchemy來操作數(shù)據(jù)庫的幾個小總結(jié),需要的朋友可以參考下2024-08-08
對Python生成漢字字庫文字,以及轉(zhuǎn)換為文字圖片的實(shí)例詳解
今天小編就為大家分享一篇對Python生成漢字字庫文字,以及轉(zhuǎn)換為文字圖片的實(shí)例詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01
python 將日期戳(五位數(shù)時間)轉(zhuǎn)換為標(biāo)準(zhǔn)時間
這篇文章主要介紹了python 將日期戳(五位數(shù)時間)轉(zhuǎn)換為標(biāo)準(zhǔn)時間的實(shí)現(xiàn)方法,本文圖文并茂給大家介紹的非常詳細(xì),具有一定的參考借鑒價值 ,需要的朋友可以參考下2019-07-07
Python下調(diào)用Linux的Shell命令的方法
有時候難免需要直接調(diào)用Shell命令來完成一些比較簡單的操作,這篇文章主要介紹了Python下調(diào)用Linux的Shell命令的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-06-06

