Python使用apscheduler模塊設置定時任務的實現
一、安裝
pip install apscheduler
二、ApScheduler 簡介
1 APScheduler的組件
triggers:觸發(fā)器
triggers包含任務執(zhí)行的調度邏輯,決定任務按照什么邏輯進行定時執(zhí)行
job stores;任務存儲器
存儲了調度任務
executors:執(zhí)行器
用例執(zhí)行任務的,包含線程池以及進程池等的創(chuàng)建和調用等等
schedulers:調度器
屬于控制面,將其他幾個方面組織起來的作用、
2 調度器的種類
調度器有以下幾種常見類型,其中最常用的BackgroundScheduler,即非阻塞式,因為在一般情況下,定時任務都會在放到web服務中,如果使用阻塞式,則無法啟動web服務,而使用非阻塞式,則將定時任務設定后,就不管了,繼續(xù)執(zhí)行后面的web服務,只要web服務在運行,定時任務就是一直有效的
- BlockingScheduler: 阻塞式
- BackgroundScheduler: 非阻塞式(后臺運行)
- AsyncIOScheduler: 當使用asyncio模塊時使用
- GeventScheduler: 當使用gevent模塊時使用
- TornadoScheduler: 構建Tornado應用時使用
- TwistedScheduler: 構建Twisted應用時使用
- QtScheduler: 構建Qt應用時使用
3 內置的觸發(fā)器類型
- date: 在某個時間點執(zhí)行一次時使用
- interval: 固定的時間間隔循環(huán)執(zhí)行時使用
- cron: 在一天中特定的時間點執(zhí)行時使用
- calendarinterval: 當想在在一天中特定時間點或以日歷為基礎的時間間隔內執(zhí)行時使用
三、使用舉例
這里jiu就以非阻塞式BackgroundScheduler調度器為例展開
1 使用date類型的觸發(fā)器
如下,使用了三種設置日期和時間的方法
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age))
def main():
? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? sched=BackgroundScheduler()
? ? # 通過date 設置指定日期執(zhí)行
? ? sched.add_job(do_func,trigger="date",run_date=date(2022,5,25),args=("張三豐",100))
? ? # 通過datetime,設置指定日期額指定時刻執(zhí)行
? ? sched.add_job(do_func, trigger="date", run_date=datetime(2022, 5, 25,14,0,10), args=("張三豐", 100))
? ? # 直接使用文本的方式指定日期和時刻表
? ? sched.add_job(do_func, trigger="date", run_date="2022-05-25 14:0:20", args=("張三豐", 100))
? ? sched.start()
if __name__=="__main__":
? ? main()
? ? while True:
? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? ? ? time.sleep(1)執(zhí)行結果如下,可以發(fā)現,第一個通過date指定日期的默認是0點執(zhí)行,顯然時間已經過了,不會執(zhí)行,第二和第三個則在規(guī)定的時間點執(zhí)行了,這里還需要注意的是,通過打印可以看出,main函數執(zhí)行完成后,已經開始執(zhí)行main函數下面的while循環(huán)打印語句了,而在執(zhí)行循環(huán)的過程中,定時任務仍然生效,這就是非阻塞式調度器的原理,如果是阻塞式,則在此代碼中,會一直卡在main函數中,main下面額while循環(huán)語句是不會執(zhí)行的,因此在實際使用中,非阻塞式應用的是非常多的
2022-05-25 14:00:02
2022-05-25 14:00:02
C:\Users\hitre\.virtualenvs\zentaolinkgitlab-QCS5yxD9\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html
return tzinfo.localize(dt)
Run time of job "do_func (trigger: date[2022-05-25 00:00:00 CST], next run at: 2022-05-25 00:00:00 CST)" was missed by 14:00:02.088260
2022-05-25 14:00:03
2022-05-25 14:00:04
2022-05-25 14:00:05
2022-05-25 14:00:06
2022-05-25 14:00:07
2022-05-25 14:00:08
2022-05-25 14:00:09
2022-05-25 14:00:10 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:00:10
2022-05-25 14:00:11
2022-05-25 14:00:12
2022-05-25 14:00:13
2022-05-25 14:00:14
2022-05-25 14:00:15
2022-05-25 14:00:16
2022-05-25 14:00:17
2022-05-25 14:00:18
2022-05-25 14:00:19
2022-05-25 14:00:20 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:00:20
2022-05-25 14:00:21
2022-05-25 14:00:22
2 使用interval類型的觸發(fā)器
如下代碼演示了時間間隔循環(huán)執(zhí)行的使用例子
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age))
def main():
? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? sched=BackgroundScheduler()
? ? # 每3秒執(zhí)行一次
? ? sched.add_job(do_func,trigger="interval",args=("張三豐",100),seconds=3)
? ? # 每3分鐘執(zhí)行一次
? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), minutes=3)
? ? # 每3小時執(zhí)行一次
? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), hours=3)
? ? # 每3天執(zhí)行一次
? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), days=3)
? ? # 每3周執(zhí)行一次
? ? sched.add_job(do_func, trigger="interval", args=("張三豐", 100), weeks=3)
? ? sched.start()
if __name__=="__main__":
? ? main()
? ? while True:
? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? ? ? time.sleep(1)上面的代碼中因為時間跨度比較大,這里只演示妹3秒執(zhí)行一次的代碼
代碼如下:
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age))
def main():
? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? sched=BackgroundScheduler()
? ? # 每3秒執(zhí)行一次
? ? sched.add_job(do_func,trigger="interval",args=("張三豐",100),seconds=3)
? ? sched.start()
if __name__=="__main__":
? ? main()
? ? while True:
? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? ? ? time.sleep(1)執(zhí)行結果如下:
2022-05-25 14:14:04
2022-05-25 14:14:04
2022-05-25 14:14:05
2022-05-25 14:14:06
2022-05-25 14:14:07 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:14:07
2022-05-25 14:14:08
2022-05-25 14:14:09
2022-05-25 14:14:10 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:14:10
2022-05-25 14:14:11
2022-05-25 14:14:12
2022-05-25 14:14:13 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:14:13
3 使用cron類型的觸發(fā)器
cron的觸發(fā)器有點類似linux上crontab定時器的使用,代碼如下:
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age))
def main():
? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? sched=BackgroundScheduler()
? ? # 任務會在6月、7月、8月、11月和12月的第三個周五,00:00、01:00、02:00和03:00觸發(fā)
? ? sched.add_job(do_func,trigger="cron",month='6-8,11-12', day='3rd fri', hour='0-3',args=("張三豐",100))
? ? sched.start()
if __name__=="__main__":
? ? main()
? ? while True:
? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? ? ? time.sleep(1)這里需要注意的是,可以省略不需要的字段。當省略時間參數時,在顯式指定參數之前的參數會被設定為*,之后的參數會被設定為最小值,week 和day_of_week的最小值為*
day=1, minute=20 等同于 year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
此外,也可以直接使用crontab表達式,如下:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age))
def main():
? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? sched=BackgroundScheduler()
? ? # 任務會在6月、7月、8月、11月和12月的第三個周五,00:00、01:00、02:00和03:00觸發(fā)
? ? sched.add_job(do_func,trigger=CronTrigger.from_crontab('48 10 1-15 sep-nov *'),args=("張三豐",100))
? ? sched.start()
if __name__=="__main__":
? ? main()
? ? while True:
? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? ? ? time.sleep(1)四、定時器使用裝飾器的方法
以間隔時間循環(huán)執(zhí)行的代碼為例,如下為未使用裝飾器的方式
from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime
def do_func(name,age):
? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age))
def main():
? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? sched=BackgroundScheduler()
? ? # 每3秒執(zhí)行一次
? ? sched.add_job(do_func,trigger="interval",args=("張三豐",100),seconds=3)
? ? sched.start()
if __name__=="__main__":
? ? main()
? ? while True:
? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? ? ? time.sleep(1)修改為使用裝飾器的方式如下:
from apscheduler.schedulers.background import BackgroundScheduler
import time
sched=BackgroundScheduler()
@sched.scheduled_job(trigger="interval",args=("張三豐",100),seconds=3)
def do_func(name,age):
? ? print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年齡:"+str(age))
if __name__=="__main__":
? ? sched.start()
? ? while True:
? ? ? ? print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
? ? ? ? time.sleep(1)執(zhí)行結果如下:
2022-05-25 14:34:10
2022-05-25 14:34:11
2022-05-25 14:34:12
2022-05-25 14:34:13 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:34:13
2022-05-25 14:34:14
2022-05-25 14:34:15
2022-05-25 14:34:16 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:34:16
2022-05-25 14:34:17
2022-05-25 14:34:18
2022-05-25 14:34:19 in do func : 姓名:張三豐 年齡:100
2022-05-25 14:34:19
2022-05-25 14:34:20
到此這篇關于Python使用apscheduler模塊設置定時任務的實現的文章就介紹到這了,更多相關Python apscheduler定時任務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
在 Django/Flask 開發(fā)服務器上使用 HTTPS
使用 Django 或 Flask 這種框架開發(fā) web app 的時候一般都會用內建服務器開發(fā)和調試程序,等程序完成后再移交到生產環(huán)境部署。問題是這些內建服務器通常都不支持 HTTPS,那么我們來探討下開啟https吧2014-07-07
python 實用工具狀態(tài)機transitions
這篇文章主要介紹了python 實用工具狀態(tài)機transitions的使用,幫助大家更好的理解和學習python,感興趣的朋友可以了解下2020-11-11
pycharm中使用request和Pytest進行接口測試的方法
這篇文章主要介紹了pycharm中使用request和Pytest進行接口測試的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07
學習python之編寫簡單簡單連接數據庫并執(zhí)行查詢操作
這篇文章主要介紹了學習python之編寫簡單簡單連接數據庫并執(zhí)行查詢操作,需要的朋友可以參考下2016-02-02

