一文詳解Python定時任務觸發(fā)
APScheduler
APScheduler 四個組件分別為:
調(diào)度器(scheduler)、觸發(fā)器(trigger),作業(yè)存儲(job store),執(zhí)行器(executor)
安裝命令:
pip install setuptools pip install --ignore-installed apscheduler
1.新建調(diào)度器schedulers
BlockingScheduler : 調(diào)度器在當前進程的主線程中運行,也就是會阻塞當前線程
BackgroundScheduler : 調(diào)度器在后臺線程中運行,不會阻塞當前線程
import datetime as dt from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()
2.添加調(diào)度任務trigger
① date 觸發(fā)器:(指定時間點觸發(fā)),參數(shù)如下:
- run_date(datetime或str):任務運行的日期或時間
- timezone(datetime.tzinfo或str):指定時區(qū)
# 例1:在 2020-9-24 時刻運行一次 func 方法 scheduler.add_job(func, 'date', run_date = dt.date(2020, 9, 24)) # 例2: 在 2020-9-24 15:10:00 時刻運行一次 func 方法 scheduler.add_job(func, 'date', run_date = dt.datetime(2020, 9, 24, 15, 10, 0)) # 例3: 在 2020-9-24 15:11:00 時刻運行一次 func 方法 scheduler.add_job(func, 'date', run_date = '2020-9-24 15:11:00')
② interval 觸發(fā)器: (固定時間間隔觸發(fā)),參數(shù)如下:
- weeks(int):間隔幾周
- days(int):間隔幾天
- hours(int):間隔幾小時
- minutes(int):間隔幾分鐘
- seconds(int):間隔幾秒鐘
- start_date(datetime或str):開始時間
- end_date(datetime或str):結束時間
- timezone(datetime.tzinfo或str):時區(qū)
# 例1:每隔兩分鐘執(zhí)行一次 func 方法 scheduler.add_job(func, 'interval', minutes = 2) # 例2:在 2020-9-24 15:15:00 ~ 2020-9-24 15:20:00 之間, 每隔兩分鐘執(zhí)行一次 func 方法 scheduler.add_job(func, 'interval', minutes = 2, start_date = '2020-9-24 15:15:00' , end_date = '2020-9-24 15:20:00')
③ cron 觸發(fā)器:(在指定時間周期性地觸發(fā)),參數(shù)如下:
- year(int 或 str):年
- month(int 或 str):月
- day(int 或 str):日
- week(int 或 str):周(1-53)
- day_of_week(int 或 str):星期幾(0-6)
- hour(int 或 str):時
- minute(int 或 str):分
- second(int 或 str):秒
- start_date(datetime或str):最早開始時間(包含)
- end_date(datetime或str):最晚結束時間(包含)
- timezone(datetime.tzinfo或str):指定時區(qū)
字符 :
1. * 每一(每一分)
2. ? 表示不關心,任意
3. - 范圍 (小時:1-12,1到12點運行)
4. , 標示多個值 (小時 1,2,3 1點2點3點運行)
5. / 遞增觸發(fā)(0/15,從0開始每15秒運行一次)
6. L 最后(日L,當月最后一天,周L周六)
7. W 指定日期最近的工作日(周一到周五)
8. # 序號(表示每月的第幾個周幾)
# 例:在每年 1-3、7-9 月份中的每個星期一、二中的 00:00, 01:00, 02:00 和 03:00 執(zhí)行 func 任務 scheduler.add_job(func, 'cron', month = '1-3,7-9',day='0, tue', hour='0-3')
3.運行調(diào)度任務
scheduler.start()
3.1 測試時間
def forecast_adjust(): now_temp = datetime.now() print('執(zhí)行方案一', now_temp, '時間間隔: ', now_temp-t0) def for2(): now_temp = datetime.now() print('執(zhí)行方案二', now_temp, '時間間隔: ', now_temp-t0) def fortime3(): now_temp = datetime.now() print('執(zhí)行方案三', now_temp, '時間間隔: ', now_temp-t0) return '9999999999999' def a__(): b = scheduler.add_job(fortime3, 'cron', hour='15', minute = '18') c = scheduler.add_job(fortime3, 'cron', hour='15', minute = '30') d = scheduler.add_job(fortime3, 'cron', hour='15', minute = '45') print(b) print(c) print(c) return 'kkkqq' t0 = datetime.now() scheduler = BlockingScheduler() # 采用阻塞的方式 scheduler.add_job(func=forecast_adjust, trigger=CronTrigger(minute="*/1", second=20, timezone=tz_now), args=[]) scheduler.add_job(func=for2, trigger=CronTrigger(minute="*/5", second=10, timezone=tz_now), args=[]) k = a__() print(k) scheduler.start()
4.特點,其他操作
APScheduler 定點、定時:
四個組件分別為:觸發(fā)器(trigger),作業(yè)存儲器(job store),執(zhí)行器(executor),調(diào)度器(scheduler)
(1)job stores:對調(diào)度任務的管理:
① 添加job:
# add_job():可以改變或者移除 job scheduler.add_job(func, 'interval', minutes = 2) # scheduled_job():只適用于應用運行期間不會改變的 job scheduler.scheduled_job(func, 'interval', minutes = 2)
②移除job:
# remove_job() :根據(jù) job 的 id 來移除,所以要在 job 創(chuàng)建的時候指定一個 id scheduler.add_job(func, 'interval', minutes = 2, id = 'job_one') scheduler.remove_job(job_one) # job.remove() :對 job 執(zhí)行 remove 方法 job = add_job(func, 'interval', minutes = 2, id = 'job_one') job.remvoe()
③ 暫停job:
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job()
④ 恢復job:
apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
⑤ 修改job:
# modify_job() scheduler.modify_job('job_one', minutes = 5) # job.modify() job = scheduler.add_job(func, 'interval', minutes = 2) job.modify(minutes = 5)
⑥ 關閉job:
scheduler.shutdown() scheduler.shutdown(wait=false)
(2)executors:執(zhí)行調(diào)度任務的模塊,常用的 executor 有兩種:
ProcessPoolExecutor ThreadPoolExecutor
到此這篇關于一文詳解Python定時任務觸發(fā)的文章就介紹到這了,更多相關Python定時任務內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
python 根據(jù)網(wǎng)易云歌曲的ID 直接下載歌曲的實例
今天小編就為大家分享一篇python 根據(jù)網(wǎng)易云歌曲的ID 直接下載歌曲的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08django自帶serializers序列化返回指定字段的方法
今天小編就為大家分享一篇django自帶serializers序列化返回指定字段的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08anaconda創(chuàng)建、查看、激活與刪除虛擬環(huán)境指令總結
在跑項目時常常會安裝很多的包,也通常會遇到需要安裝指定版本的包,以及包與包不兼容的問題,下面這篇文章主要給大家介紹了關于anaconda創(chuàng)建、查看、激活與刪除虛擬環(huán)境指令的相關資料,需要的朋友可以參考下2022-11-11Pandas DataFrame實現(xiàn)任意位置插入一列或一行
Pandas是Python中最流行的數(shù)據(jù)處理和分析庫之一,在數(shù)據(jù)分析過程中,有時候需要在Dataframe中插入新的數(shù)據(jù)列,本文主要介紹了Pandas DataFrame實現(xiàn)任意位置插入一列或一行,具有一定的參考價值,感興趣的可以了解一下2023-08-08Python 的第三方調(diào)試庫 ???pysnooper?? 使用示例
這篇文章主要介紹了Python 的第三方調(diào)試庫 ???pysnooper?? 使用示例的相關資料,需要的朋友可以參考下2023-02-02