基于Python?schedule的任務(wù)調(diào)度詳解
schedule 是Python的第三方任務(wù)調(diào)度庫,可以用來做定時(shí)任務(wù),API簡單易用,可以按照秒,分,小時(shí),日期或者自定義事件執(zhí)行時(shí)間,不需要額外的流程,非常輕量級(jí),沒有外部依賴,兼容Python 3.7、3.8、3.9、3.10和3.11。
安裝
$ pip install schedule
每隔一段時(shí)間執(zhí)行一次
import schedule import time def job(): print("I'm working...") # Run job every 3 second/minute/hour/day/week, # Starting 3 second/minute/hour/day/week from now schedule.every(3).seconds.do(job) schedule.every(3).minutes.do(job) schedule.every(3).hours.do(job) schedule.every(3).days.do(job) schedule.every(3).weeks.do(job) # Run job every minute at the 23rd second schedule.every().minute.at(":23").do(job) # Run job every hour at the 42nd minute schedule.every().hour.at(":42").do(job) # Run jobs every 5th hour, 20 minutes and 30 seconds in. # If current time is 02:00, first execution is at 06:20:30 schedule.every(5).hours.at("20:30").do(job) # Run job every day at specific HH:MM and next HH:MM:SS schedule.every().day.at("10:30").do(job) schedule.every().day.at("10:30:42").do(job) schedule.every().day.at("12:42", "Europe/Amsterdam").do(job) # Run job on a specific day of the week schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
用裝飾器執(zhí)行
用@repeat裝飾器來設(shè)置執(zhí)行參數(shù)。
from schedule import every, repeat, run_pending import time @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)
給任務(wù)傳遞參數(shù)
do()可以傳遞額外的參數(shù)給任務(wù)。
import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') from schedule import every, repeat @repeat(every().second, "World") @repeat(every().day, "Mars") def hello(planet): print("Hello", planet)
取消任務(wù)
import schedule def some_task(): print('Hello world') job = schedule.every().day.at('22:30').do(some_task) schedule.cancel_job(job)
schedule.cancel_job(job)取消任務(wù)。
執(zhí)行一次任務(wù)
啟動(dòng)任務(wù)后,調(diào)用schedule.CancelJob取消任務(wù),就只執(zhí)行一次任務(wù)。
import schedule import time def job_that_executes_once(): # Do some work that only needs to happen once... return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
獲取所有的任務(wù)
schedule.get_jobs()獲取所有任務(wù)。
import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()
取消所有任務(wù)
schedule.clear()取消所有任務(wù)。
import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().second.do(greet) schedule.clear()
獲取指定任務(wù)
通過tag設(shè)置標(biāo)簽,schedule.get_jobs('friend')來獲取指定標(biāo)簽的任務(wù)。
import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend')
取消指定任務(wù)
import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') schedule.clear('daily-tasks')
指定區(qū)間運(yùn)行任務(wù)
def my_job(): print('Foo') # Run every 5 to 10 seconds. schedule.every(5).to(10).seconds.do(my_job)
執(zhí)行任務(wù)直到指定時(shí)間(超時(shí))
import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # run job until a 18:30 today schedule.every(1).hours.until("18:30").do(job) # run job until a 2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # Schedule a job to run for the next 8 hours schedule.every(1).hours.until(timedelta(hours=8)).do(job) # Run my_job until today 11:33:42 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # run job until a specific datetime schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
執(zhí)行到下一個(gè)任務(wù)
schedule.idle_seconds()用來獲取下一個(gè)任務(wù)計(jì)劃執(zhí)行的秒數(shù)。如果沒有任務(wù)了,返回None。
import schedule import time def job(): print('Hello') schedule.every(5).seconds.do(job) while 1: n = schedule.idle_seconds() if n is None: # no more jobs break elif n > 0: # sleep exactly the right amount of time time.sleep(n) schedule.run_pending()
立即運(yùn)行
import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # Add the delay_seconds argument to run the jobs with a number # of seconds delay in between. schedule.run_all(delay_seconds=10)
schedule.run_all()會(huì)讓任務(wù)忽略計(jì)劃,立即執(zhí)行。schedule.run_all(delay_seconds=10)讓任務(wù)恢復(fù)執(zhí)行。
后臺(tái)執(zhí)行
import threading import time import schedule def run_continuously(interval=1): """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading. Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): schedule.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run def background_job(): print('Hello from the background thread') schedule.every().second.do(background_job) # Start the background thread stop_run_continuously = run_continuously() # Do some other things... time.sleep(10) # Stop the background thread stop_run_continuously.set()
并行執(zhí)行
import threading import time import schedule def job(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) while 1: schedule.run_pending() time.sleep(1)
這里每個(gè)任務(wù)都啟動(dòng)一個(gè)線程來并行執(zhí)行。
import time import threading import schedule import queue def job(): print("I'm working") def worker_main(): while 1: job_func = jobqueue.get() job_func() jobqueue.task_done() jobqueue = queue.Queue() schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) worker_thread = threading.Thread(target=worker_main) worker_thread.start() while 1: schedule.run_pending() time.sleep(1)
這里用到用了一個(gè)任務(wù)隊(duì)列來嚴(yán)格執(zhí)行線程數(shù)。
設(shè)置時(shí)區(qū)
# Pass a timezone as a string schedule.every().day.at("12:42", "Europe/Amsterdam").do(job) # Pass an pytz timezone object from pytz import timezone schedule.every().friday.at("12:42", timezone("Africa/Lagos")).do(job)
以上就是基于Python schedule的任務(wù)調(diào)度詳解的詳細(xì)內(nèi)容,更多關(guān)于Python schedule任務(wù)調(diào)度的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python中數(shù)字以及算數(shù)運(yùn)算符的相關(guān)使用
這篇文章主要介紹了Python中數(shù)字以及算數(shù)運(yùn)算符的相關(guān)使用,是Python入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-10-10Flaks基礎(chǔ)之在URL中添加變量的實(shí)現(xiàn)詳解
這篇文章主要介紹了在python開發(fā)中,F(xiàn)laks框架之上在URL鏈接中添加變量的實(shí)現(xiàn)方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-09-09運(yùn)行python提示no module named sklearn的解決方法
這篇文章主要介紹了運(yùn)行python提示no module named sklearn的解決方法,需要的朋友可以參考下2020-11-11python計(jì)算數(shù)字或者數(shù)組的階乘的實(shí)現(xiàn)
本文主要介紹了python計(jì)算數(shù)字或者數(shù)組的階乘,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08python+POP3實(shí)現(xiàn)批量下載郵件附件
這篇文章主要為大家詳細(xì)介紹了python+POP3實(shí)現(xiàn)批量下載郵件附件,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-06-06Python中的Request請(qǐng)求重試機(jī)制
這篇文章主要介紹了Python中的Request請(qǐng)求重試機(jī)制,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06