基于Python?schedule的任務調(diào)度詳解
schedule 是Python的第三方任務調(diào)度庫,可以用來做定時任務,API簡單易用,可以按照秒,分,小時,日期或者自定義事件執(zhí)行時間,不需要額外的流程,非常輕量級,沒有外部依賴,兼容Python 3.7、3.8、3.9、3.10和3.11。
安裝
$ pip install schedule
每隔一段時間執(zhí)行一次
import schedule
import time
def job():
print("I'm working...")
# Run job every 3 second/minute/hour/day/week,
# Starting 3 second/minute/hour/day/week from now
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)
# Run job every minute at the 23rd second
schedule.every().minute.at(":23").do(job)
# Run job every hour at the 42nd minute
schedule.every().hour.at(":42").do(job)
# Run jobs every 5th hour, 20 minutes and 30 seconds in.
# If current time is 02:00, first execution is at 06:20:30
schedule.every(5).hours.at("20:30").do(job)
# Run job every day at specific HH:MM and next HH:MM:SS
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("10:30:42").do(job)
schedule.every().day.at("12:42", "Europe/Amsterdam").do(job)
# Run job on a specific day of the week
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)

用裝飾器執(zhí)行
用@repeat裝飾器來設置執(zhí)行參數(shù)。
from schedule import every, repeat, run_pending
import time
@repeat(every(10).minutes)
def job():
print("I am a scheduled job")
while True:
run_pending()
time.sleep(1)
給任務傳遞參數(shù)
do()可以傳遞額外的參數(shù)給任務。
import schedule
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
from schedule import every, repeat
@repeat(every().second, "World")
@repeat(every().day, "Mars")
def hello(planet):
print("Hello", planet)
取消任務
import schedule
def some_task():
print('Hello world')
job = schedule.every().day.at('22:30').do(some_task)
schedule.cancel_job(job)
schedule.cancel_job(job)取消任務。
執(zhí)行一次任務
啟動任務后,調(diào)用schedule.CancelJob取消任務,就只執(zhí)行一次任務。
import schedule
import time
def job_that_executes_once():
# Do some work that only needs to happen once...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
獲取所有的任務
schedule.get_jobs()獲取所有任務。
import schedule
def hello():
print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()
取消所有任務
schedule.clear()取消所有任務。
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().second.do(greet)
schedule.clear()
獲取指定任務
通過tag設置標簽,schedule.get_jobs('friend')來獲取指定標簽的任務。
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
取消指定任務
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
schedule.clear('daily-tasks')
指定區(qū)間運行任務
def my_job():
print('Foo')
# Run every 5 to 10 seconds.
schedule.every(5).to(10).seconds.do(my_job)
執(zhí)行任務直到指定時間(超時)
import schedule
from datetime import datetime, timedelta, time
def job():
print('Boo')
# run job until a 18:30 today
schedule.every(1).hours.until("18:30").do(job)
# run job until a 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Schedule a job to run for the next 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# Run my_job until today 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# run job until a specific datetime
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
執(zhí)行到下一個任務
schedule.idle_seconds()用來獲取下一個任務計劃執(zhí)行的秒數(shù)。如果沒有任務了,返回None。
import schedule
import time
def job():
print('Hello')
schedule.every(5).seconds.do(job)
while 1:
n = schedule.idle_seconds()
if n is None:
# no more jobs
break
elif n > 0:
# sleep exactly the right amount of time
time.sleep(n)
schedule.run_pending()
立即運行
import schedule
def job_1():
print('Foo')
def job_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# Add the delay_seconds argument to run the jobs with a number
# of seconds delay in between.
schedule.run_all(delay_seconds=10)
schedule.run_all()會讓任務忽略計劃,立即執(zhí)行。schedule.run_all(delay_seconds=10)讓任務恢復執(zhí)行。
后臺執(zhí)行
import threading
import time
import schedule
def run_continuously(interval=1):
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def background_job():
print('Hello from the background thread')
schedule.every().second.do(background_job)
# Start the background thread
stop_run_continuously = run_continuously()
# Do some other things...
time.sleep(10)
# Stop the background thread
stop_run_continuously.set()
并行執(zhí)行
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
這里每個任務都啟動一個線程來并行執(zhí)行。
import time
import threading
import schedule
import queue
def job():
print("I'm working")
def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()
jobqueue = queue.Queue()
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()
while 1:
schedule.run_pending()
time.sleep(1)
這里用到用了一個任務隊列來嚴格執(zhí)行線程數(shù)。
設置時區(qū)
# Pass a timezone as a string
schedule.every().day.at("12:42", "Europe/Amsterdam").do(job)
# Pass an pytz timezone object
from pytz import timezone
schedule.every().friday.at("12:42", timezone("Africa/Lagos")).do(job)
以上就是基于Python schedule的任務調(diào)度詳解的詳細內(nèi)容,更多關于Python schedule任務調(diào)度的資料請關注腳本之家其它相關文章!
相關文章
Python中數(shù)字以及算數(shù)運算符的相關使用
這篇文章主要介紹了Python中數(shù)字以及算數(shù)運算符的相關使用,是Python入門學習中的基礎知識,需要的朋友可以參考下2015-10-10
運行python提示no module named sklearn的解決方法
這篇文章主要介紹了運行python提示no module named sklearn的解決方法,需要的朋友可以參考下2020-11-11
python計算數(shù)字或者數(shù)組的階乘的實現(xiàn)
本文主要介紹了python計算數(shù)字或者數(shù)組的階乘,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08

