欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python周期任務(wù)調(diào)度工具Schedule使用詳解

 更新時(shí)間:2021年11月23日 08:41:43   作者:Python學(xué)習(xí)與數(shù)據(jù)挖掘  
這篇文章主要為大家介紹了python周期任務(wù)調(diào)度工具Schedule的使用及示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

如果你想周期性地執(zhí)行某個(gè) Python 腳本,最出名的選擇應(yīng)該是 Crontab 腳本,但是 Crontab 具有以下缺點(diǎn):

1.不方便執(zhí)行秒級任務(wù)。

2.當(dāng)需要執(zhí)行的定時(shí)任務(wù)有上百個(gè)的時(shí)候,Crontab 的管理就會特別不方便。

還有一個(gè)選擇是 Celery,但是 Celery 的配置比較麻煩,如果你只是需要一個(gè)輕量級的調(diào)度工具,Celery 不會是一個(gè)好選擇。

在你想要使用一個(gè)輕量級的任務(wù)調(diào)度工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納 Crontab 的所有基本功能,那么 Schedule 模塊是你的不二之選。

使用它來調(diào)度任務(wù)可能只需要幾行代碼,感受一下:

import schedule
import time
def job():
    print("I'm working...")
schedule.every(10).minutes.do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

上面的代碼表示每10分鐘執(zhí)行一次 job 函數(shù),非常簡單方便。你只需要引入 schedule 模塊,通過調(diào)用 scedule.every(時(shí)間數(shù)).時(shí)間類型.do(job) 發(fā)布周期任務(wù)。

發(fā)布后的周期任務(wù)需要用 run_pending 函數(shù)來檢測是否執(zhí)行,因此需要一個(gè) While 循環(huán)不斷地輪詢這個(gè)函數(shù)。

下面具體講講Schedule模塊的安裝和初級、進(jìn)階使用方法。

1.準(zhǔn)備

開始之前,你要確保Python和pip已經(jīng)成功安裝在電腦上,請選擇以下任一種方式輸入命令安裝依賴:

Windows 環(huán)境 打開 Cmd (開始-運(yùn)行-CMD)。

MacOS 環(huán)境 打開 Terminal (command+空格輸入Terminal)。

如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.

pip install schedule

2.基本使用

最基本的使用在文首已經(jīng)提到過,下面給大家展示更多的調(diào)度任務(wù)例子:

import schedule
import time
def job():
    print("I'm working...")
# 每十分鐘執(zhí)行任務(wù)
schedule.every(10).minutes.do(job)
# 每個(gè)小時(shí)執(zhí)行任務(wù)
schedule.every().hour.do(job)
# 每天的10:30執(zhí)行任務(wù)
schedule.every().day.at("10:30").do(job)
# 每個(gè)月執(zhí)行任務(wù)
schedule.every().monday.do(job)
# 每個(gè)星期三的13:15分執(zhí)行任務(wù)
schedule.every().wednesday.at("13:15").do(job)
# 每分鐘的第17秒執(zhí)行任務(wù)
schedule.every().minute.at(":17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只運(yùn)行一次任務(wù)的話,可以這么配

import schedule
import time
def job_that_executes_once():
    # 此處編寫的任務(wù)只會執(zhí)行一次...
    return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

參數(shù)傳遞

如果你有參數(shù)需要傳遞給作業(yè)去執(zhí)行,你只需要這么做:

import schedule
def greet(name):
    print('Hello', name)
# do() 將額外的參數(shù)傳遞給job函數(shù)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

獲取目前所有的作業(yè)

如果你想獲取目前所有的作業(yè):

import schedule
def hello():
    print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()

取消所有作業(yè)

如果某些機(jī)制觸發(fā)了,你需要立即清除當(dāng)前程序的所有作業(yè):

import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().second.do(greet)
schedule.clear()

標(biāo)簽功能

在設(shè)置作業(yè)的時(shí)候,為了后續(xù)方便管理作業(yè),你可以給作業(yè)打個(gè)標(biāo)簽,這樣你可以通過標(biāo)簽過濾獲取作業(yè)或取消作業(yè)。

import schedule
def greet(name):
    print('Hello {}'.format(name))
# .tag 打標(biāo)簽
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
# get_jobs(標(biāo)簽):可以獲取所有該標(biāo)簽的任務(wù)
friends = schedule.get_jobs('friend')
# 取消所有 daily-tasks 標(biāo)簽的任務(wù)
schedule.clear('daily-tasks')

設(shè)定作業(yè)截止時(shí)間

如果你需要讓某個(gè)作業(yè)到某個(gè)時(shí)間截止,你可以通過這個(gè)方法:

import schedule
from datetime import datetime, timedelta, time
def job():
    print('Boo')
# 每個(gè)小時(shí)運(yùn)行作業(yè),18:30后停止
schedule.every(1).hours.until("18:30").do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),8個(gè)小時(shí)后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),11:32:42后停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),2020-5-17 11:36:20后停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

截止日期之后,該作業(yè)將無法運(yùn)行。

立即運(yùn)行所有作業(yè),而不管其安排如何

如果某個(gè)機(jī)制觸發(fā)了,你需要立即運(yùn)行所有作業(yè),可以調(diào)用 schedule.run_all() :

import schedule
def job_1():
    print('Foo')
def job_2():
    print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# 立即運(yùn)行所有作業(yè),每次作業(yè)間隔10秒
schedule.run_all(delay_seconds=10)

3.高級使用

裝飾器安排作業(yè)

如果你覺得設(shè)定作業(yè)這種形式太啰嗦了,也可以使用裝飾器模式:

from schedule import every, repeat, run_pending
import time
# 此裝飾器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")
while True:
    run_pending()
    time.sleep(1)

并行執(zhí)行

默認(rèn)情況下,Schedule 按順序執(zhí)行所有作業(yè)。其背后的原因是,很難找到讓每個(gè)人都高興的并行執(zhí)行模型。

不過你可以通過多線程的形式來運(yùn)行每個(gè)作業(yè)以解決此限制:

import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

日志記錄

Schedule 模塊同時(shí)也支持 logging 日志記錄,這么使用:

import schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志級別為DEBUG
schedule_logger.setLevel(level=logging.DEBUG)
def job():
    print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

異常處理

Schedule 不會自動(dòng)捕捉異常,它遇到異常會直接拋出,這會導(dǎo)致一個(gè)嚴(yán)重的問題:后續(xù)所有的作業(yè)都會被中斷執(zhí)行,因此我們需要捕捉到這些異常。

你可以手動(dòng)捕捉,但是某些你預(yù)料不到的情況需要程序進(jìn)行自動(dòng)捕獲,加一個(gè)裝飾器就能做到了:

import functools
def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0
schedule.every(5).minutes.do(bad_task)

這樣,bad_task 在執(zhí)行時(shí)遇到的任何錯(cuò)誤,都會被 catch_exceptions 捕獲,這點(diǎn)在保證調(diào)度任務(wù)正常運(yùn)轉(zhuǎn)的時(shí)候非常關(guān)鍵。

我們的文章到此就結(jié)束啦,如果你喜歡今天的Python 實(shí)戰(zhàn)教程,請持續(xù)關(guān)注。

以上就是python周期任務(wù)調(diào)度工具Schedule使用詳解的詳細(xì)內(nèi)容,更多關(guān)于周期任務(wù)調(diào)度工具Schedule的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 使用python半分鐘輕松完成證件照換底色

    使用python半分鐘輕松完成證件照換底色

    是不是很多小伙伴兒都不清楚公司是需要藍(lán)底還是紅底的證件照,今天小編直接帶大家做一款Python換底色的一款小程序,不管什么底色兒,放馬過來
    2021-09-09
  • Python中特殊函數(shù)集錦

    Python中特殊函數(shù)集錦

    這篇文章主要介紹了Python中特殊函數(shù),主要介紹一下四個(gè)函數(shù):1 過濾函數(shù)filter 2 映射和歸并函數(shù)map/reduce 3 裝飾器@ 4 匿名函數(shù)lamda,需要的朋友可以參考下
    2015-07-07
  • Python條件語句與循環(huán)語句

    Python條件語句與循環(huán)語句

    這篇文章主要介紹了Python條件語句與循環(huán)語句,條件語句就是通過指定的表達(dá)式的運(yùn)行結(jié)果來判斷當(dāng)前是執(zhí)行還是跳過某些指定的語句塊,循環(huán)語句就是對某些語句的重復(fù)執(zhí)行,這個(gè)重復(fù)執(zhí)行是通過指定表達(dá)式來控制的,下面來看具體內(nèi)容及續(xù)航管案例吧,需要的朋友可以參考一下
    2021-11-11
  • python解決字典中的值是列表問題的方法

    python解決字典中的值是列表問題的方法

    這篇文章主要介紹了字典中的值是列表問題,先用value連成一個(gè)str,最后用str.split()作一個(gè)轉(zhuǎn)換,生成一個(gè)列表.看了python cookbook,上面正好有一個(gè)recipe講到如何處理這樣的問題
    2013-03-03
  • Python3爬蟲爬取百姓網(wǎng)列表并保存為json功能示例【基于request、lxml和json模塊】

    Python3爬蟲爬取百姓網(wǎng)列表并保存為json功能示例【基于request、lxml和json模塊】

    這篇文章主要介紹了Python3爬蟲爬取百姓網(wǎng)列表并保存為json功能,涉及Python基于request、lxml和json模塊的Request請求與響應(yīng)數(shù)據(jù)處理相關(guān)操作技巧,需要的朋友可以參考下
    2018-12-12
  • Python WebSocket長連接心跳與短連接的示例

    Python WebSocket長連接心跳與短連接的示例

    這篇文章主要介紹了Python WebSocket長連接心跳與短連接的示例,幫助大家更好的理解和學(xué)習(xí)python,感興趣的朋友可以了解下
    2020-11-11
  • pandas 使用均值填充缺失值列的小技巧分享

    pandas 使用均值填充缺失值列的小技巧分享

    今天小編就為大家分享一篇pandas 使用均值填充缺失值列的小技巧分享,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-07-07
  • python里的條件語句和循環(huán)語句你了解多少

    python里的條件語句和循環(huán)語句你了解多少

    這篇文章主要為大家詳細(xì)介紹了python的條件語句和循環(huán)語句,使用數(shù)據(jù)庫,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • Python多進(jìn)程分塊讀取超大文件的方法

    Python多進(jìn)程分塊讀取超大文件的方法

    這篇文章主要介紹了Python多進(jìn)程分塊讀取超大文件的方法,涉及Python多進(jìn)程操作與文件分塊讀取的相關(guān)技巧,需要的朋友可以參考下
    2016-04-04
  • Django記錄操作日志與LogEntry的使用詳解

    Django記錄操作日志與LogEntry的使用詳解

    我們既知道如何記錄變更日志,也知道如何獲取變更日志,那么如何才能夠在admin后臺方便地查看操作日志呢?這篇文章主要給大家介紹了關(guān)于Django記錄操作日志與LogEntry使用的相關(guān)資料,需要的朋友可以參考下
    2022-01-01

最新評論