Python使用apscheduler模塊設(shè)置定時任務(wù)的實現(xiàn)
一、安裝
pip install apscheduler
二、ApScheduler 簡介
1 APScheduler的組件
triggers:觸發(fā)器
triggers包含任務(wù)執(zhí)行的調(diào)度邏輯,決定任務(wù)按照什么邏輯進行定時執(zhí)行
job stores;任務(wù)存儲器
存儲了調(diào)度任務(wù)
executors:執(zhí)行器
用例執(zhí)行任務(wù)的,包含線程池以及進程池等的創(chuàng)建和調(diào)用等等
schedulers:調(diào)度器
屬于控制面,將其他幾個方面組織起來的作用、
2 調(diào)度器的種類
調(diào)度器有以下幾種常見類型,其中最常用的BackgroundScheduler,即非阻塞式,因為在一般情況下,定時任務(wù)都會在放到web服務(wù)中,如果使用阻塞式,則無法啟動web服務(wù),而使用非阻塞式,則將定時任務(wù)設(shè)定后,就不管了,繼續(xù)執(zhí)行后面的web服務(wù),只要web服務(wù)在運行,定時任務(wù)就是一直有效的
- BlockingScheduler: 阻塞式
- BackgroundScheduler: 非阻塞式(后臺運行)
- AsyncIOScheduler: 當使用asyncio模塊時使用
- GeventScheduler: 當使用gevent模塊時使用
- TornadoScheduler: 構(gòu)建Tornado應(yīng)用時使用
- TwistedScheduler: 構(gòu)建Twisted應(yīng)用時使用
- QtScheduler: 構(gòu)建Qt應(yīng)用時使用
3 內(nèi)置的觸發(fā)器類型
- date: 在某個時間點執(zhí)行一次時使用
- interval: 固定的時間間隔循環(huán)執(zhí)行時使用
- cron: 在一天中特定的時間點執(zhí)行時使用
- calendarinterval: 當想在在一天中特定時間點或以日歷為基礎(chǔ)的時間間隔內(nèi)執(zhí)行時使用
三、使用舉例
這里jiu就以非阻塞式BackgroundScheduler調(diào)度器為例展開
1 使用date類型的觸發(fā)器
如下,使用了三種設(shè)置日期和時間的方法
from apscheduler.schedulers.background import BackgroundScheduler import time from datetime import date from datetime import datetime def do_func(name,age): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age)) def main(): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? sched=BackgroundScheduler() ? ? # 通過date 設(shè)置指定日期執(zhí)行 ? ? sched.add_job(do_func,trigger="date",run_date=date(2022,5,25),args=("張三豐",100)) ? ? # 通過datetime,設(shè)置指定日期額指定時刻執(zhí)行 ? ? sched.add_job(do_func, trigger="date", run_date=datetime(2022, 5, 25,14,0,10), args=("張三豐", 100)) ? ? # 直接使用文本的方式指定日期和時刻表 ? ? sched.add_job(do_func, trigger="date", run_date="2022-05-25 14:0:20", args=("張三豐", 100)) ? ? sched.start() if __name__=="__main__": ? ? main() ? ? while True: ? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? ? ? time.sleep(1)
執(zhí)行結(jié)果如下,可以發(fā)現(xiàn),第一個通過date指定日期的默認是0點執(zhí)行,顯然時間已經(jīng)過了,不會執(zhí)行,第二和第三個則在規(guī)定的時間點執(zhí)行了,這里還需要注意的是,通過打印可以看出,main函數(shù)執(zhí)行完成后,已經(jīng)開始執(zhí)行main函數(shù)下面的while循環(huán)打印語句了,而在執(zhí)行循環(huán)的過程中,定時任務(wù)仍然生效,這就是非阻塞式調(diào)度器的原理,如果是阻塞式,則在此代碼中,會一直卡在main函數(shù)中,main下面額while循環(huán)語句是不會執(zhí)行的,因此在實際使用中,非阻塞式應(yīng)用的是非常多的
2022-05-25 14:00:02
2022-05-25 14:00:02
C:\Users\hitre\.virtualenvs\zentaolinkgitlab-QCS5yxD9\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html
return tzinfo.localize(dt)
Run time of job "do_func (trigger: date[2022-05-25 00:00:00 CST], next run at: 2022-05-25 00:00:00 CST)" was missed by 14:00:02.088260
2022-05-25 14:00:03
2022-05-25 14:00:04
2022-05-25 14:00:05
2022-05-25 14:00:06
2022-05-25 14:00:07
2022-05-25 14:00:08
2022-05-25 14:00:09
2022-05-25 14:00:10 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:00:10
2022-05-25 14:00:11
2022-05-25 14:00:12
2022-05-25 14:00:13
2022-05-25 14:00:14
2022-05-25 14:00:15
2022-05-25 14:00:16
2022-05-25 14:00:17
2022-05-25 14:00:18
2022-05-25 14:00:19
2022-05-25 14:00:20 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:00:20
2022-05-25 14:00:21
2022-05-25 14:00:22
2 使用interval類型的觸發(fā)器
如下代碼演示了時間間隔循環(huán)執(zhí)行的使用例子
from apscheduler.schedulers.background import BackgroundScheduler import time from datetime import date from datetime import datetime def do_func(name,age): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age)) def main(): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? sched=BackgroundScheduler() ? ? # 每3秒執(zhí)行一次 ? ? sched.add_job(do_func,trigger="interval",args=("張三豐",100),seconds=3) ? ? # 每3分鐘執(zhí)行一次 ? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), minutes=3) ? ? # 每3小時執(zhí)行一次 ? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), hours=3) ? ? # 每3天執(zhí)行一次 ? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), days=3) ? ? # 每3周執(zhí)行一次 ? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), weeks=3) ? ? sched.start() if __name__=="__main__": ? ? main() ? ? while True: ? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? ? ? time.sleep(1)
上面的代碼中因為時間跨度比較大,這里只演示妹3秒執(zhí)行一次的代碼
代碼如下:
from apscheduler.schedulers.background import BackgroundScheduler import time from datetime import date from datetime import datetime def do_func(name,age): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age)) def main(): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? sched=BackgroundScheduler() ? ? # 每3秒執(zhí)行一次 ? ? sched.add_job(do_func,trigger="interval",args=("張三豐",100),seconds=3) ? ? sched.start() if __name__=="__main__": ? ? main() ? ? while True: ? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? ? ? time.sleep(1)
執(zhí)行結(jié)果如下:
2022-05-25 14:14:04
2022-05-25 14:14:04
2022-05-25 14:14:05
2022-05-25 14:14:06
2022-05-25 14:14:07 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:14:07
2022-05-25 14:14:08
2022-05-25 14:14:09
2022-05-25 14:14:10 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:14:10
2022-05-25 14:14:11
2022-05-25 14:14:12
2022-05-25 14:14:13 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:14:13
3 使用cron類型的觸發(fā)器
cron的觸發(fā)器有點類似linux上crontab定時器的使用,代碼如下:
from apscheduler.schedulers.background import BackgroundScheduler import time from datetime import date from datetime import datetime def do_func(name,age): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age)) def main(): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? sched=BackgroundScheduler() ? ? # 任務(wù)會在6月、7月、8月、11月和12月的第三個周五,00:00、01:00、02:00和03:00觸發(fā) ? ? sched.add_job(do_func,trigger="cron",month='6-8,11-12', day='3rd fri', hour='0-3',args=("張三豐",100)) ? ? sched.start() if __name__=="__main__": ? ? main() ? ? while True: ? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? ? ? time.sleep(1)
這里需要注意的是,可以省略不需要的字段。當省略時間參數(shù)時,在顯式指定參數(shù)之前的參數(shù)會被設(shè)定為*,之后的參數(shù)會被設(shè)定為最小值,week 和day_of_week的最小值為*
day=1, minute=20 等同于 year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
此外,也可以直接使用crontab表達式,如下:
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import time from datetime import date from datetime import datetime def do_func(name,age): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age)) def main(): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? sched=BackgroundScheduler() ? ? # 任務(wù)會在6月、7月、8月、11月和12月的第三個周五,00:00、01:00、02:00和03:00觸發(fā) ? ? sched.add_job(do_func,trigger=CronTrigger.from_crontab('48 10 1-15 sep-nov *'),args=("張三豐",100)) ? ? sched.start() if __name__=="__main__": ? ? main() ? ? while True: ? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? ? ? time.sleep(1)
四、定時器使用裝飾器的方法
以間隔時間循環(huán)執(zhí)行的代碼為例,如下為未使用裝飾器的方式
from apscheduler.schedulers.background import BackgroundScheduler import time from datetime import date from datetime import datetime def do_func(name,age): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age)) def main(): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? sched=BackgroundScheduler() ? ? # 每3秒執(zhí)行一次 ? ? sched.add_job(do_func,trigger="interval",args=("張三豐",100),seconds=3) ? ? sched.start() if __name__=="__main__": ? ? main() ? ? while True: ? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? ? ? time.sleep(1)
修改為使用裝飾器的方式如下:
from apscheduler.schedulers.background import BackgroundScheduler import time sched=BackgroundScheduler() @sched.scheduled_job(trigger="interval",args=("張三豐",100),seconds=3) def do_func(name,age): ? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age)) if __name__=="__main__": ? ? sched.start() ? ? while True: ? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) ? ? ? ? time.sleep(1)
執(zhí)行結(jié)果如下:
2022-05-25 14:34:10
2022-05-25 14:34:11
2022-05-25 14:34:12
2022-05-25 14:34:13 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:34:13
2022-05-25 14:34:14
2022-05-25 14:34:15
2022-05-25 14:34:16 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:34:16
2022-05-25 14:34:17
2022-05-25 14:34:18
2022-05-25 14:34:19 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:34:19
2022-05-25 14:34:20
到此這篇關(guān)于Python使用apscheduler模塊設(shè)置定時任務(wù)的實現(xiàn)的文章就介紹到這了,更多相關(guān)Python apscheduler定時任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python 爬蟲之selenium可視化爬蟲的實現(xiàn)
這篇文章主要介紹了python 爬蟲之selenium可視化爬蟲的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-12-12python?request要求接口參數(shù)必須是json數(shù)據(jù)的處理方式
這篇文章主要介紹了python?request要求接口參數(shù)必須是json數(shù)據(jù)的處理方式,Reqeusts支持以form表單形式發(fā)送post請求,只需要將請求的參數(shù)構(gòu)造成一個字典,然后傳給requests.post()的data參數(shù)即可,本文通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧2022-08-08python3使用mutagen進行音頻元數(shù)據(jù)處理的方法
mutagen是一個處理音頻元數(shù)據(jù)的python模塊,支持多種音頻格式,是一個純粹的python庫,僅依賴python標準庫,可在Python?3.7及以上版本運行,支持Linux、Windows?和?macOS系統(tǒng),這篇文章主要介紹了python3使用mutagen進行音頻元數(shù)據(jù)處理,需要的朋友可以參考下2022-10-10