欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python中實現(xiàn)定時任務(wù)詳解

 更新時間:2023年07月13日 08:47:39   作者:hd092336  
這篇文章主要介紹了Python中實現(xiàn)定時任務(wù)詳解的相關(guān)資料,需要的朋友可以參考下

在項目中,我們可能遇到有定時任務(wù)的需求。

  • 其一:每隔一個時間段就執(zhí)行任務(wù)。
    比如:壓測中每隔45分鐘調(diào)整溫箱的溫度。
  • 其二:定時執(zhí)行任務(wù)。
    例如每天早上 8 點定時推送早報。

今天,我跟大家分享下 Python 定時任務(wù)的實現(xiàn)方法。

固定時間間隔執(zhí)行任務(wù)

import time
import logging

logging.basicConfig(
    level=logging.debug,
    format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s"
)


def task():
    logging.info("Task Start.")
    time.sleep(1)
    logging.info("Task Done.")

time.sleep

第一種辦法是最簡單又最暴力。
那就是在一個死循環(huán)中,使用線程睡眠函數(shù) sleep()。

while True:
    task()
    time.sleep(5)

上述的方法有幾個問題:

  • 阻塞主進程,這個也好解決,可以放到線程中去執(zhí)行
  • 一次只有一個task,這個也有解決辦法,可以多啟幾個線程

threading.Timer

既然第一種方法暴力,那么有沒有比較優(yōu)雅點的方法?
Python 標(biāo)準(zhǔn)庫 threading 中有個 Timer 類。
它會新啟動一個線程來執(zhí)行定時任務(wù),所以它是非阻塞函式。

原理:線程中預(yù)置一個finished的事件,通過finished.wait等待固定時間間隔。
超時則執(zhí)行任務(wù)。如果需要取消任務(wù),可以調(diào)用Timer.cancel來取消任務(wù)。

from threading import Timer

t = Timer(task, 5)
t.start()

優(yōu)點:
不阻塞主進程,task在線程中執(zhí)行

缺點:
一個Timer只能執(zhí)行一次task就結(jié)束了。
如果想循環(huán),需要改造一下task函數(shù)。

from threading import Timer

def repeat_task():
    t = Timer(5, repeat_task)
    # 開始任務(wù)的位置決定了是任務(wù)之間等待固定間隔時間
    # 還是每個任務(wù)的開始等待固定間隔時間
    t.start()
    task()

這樣可以循環(huán)執(zhí)行,但是仍然只能一個線程一個任務(wù)。

Q:如何跳出循環(huán)
A:加入一個標(biāo)識符:

  • sleep可以用一個布爾值來控制
  • Timer可以用一個Event來控制

固定時間點執(zhí)行任務(wù)

以上是用內(nèi)置的方法中比較簡單的實現(xiàn)方式。簡單的功能可以實現(xiàn)。

前面執(zhí)行的都是指定間隔時間的定時任務(wù),那怎么執(zhí)行指定時間點的任務(wù)呢?

上面的方法是可以做到的。有兩種思路,

  • 前面我們指定了間隔時間,那指定時間點,就先計算當(dāng)前時間到指定時間點的間隔時間
  • 不管間隔時間,而是記錄任務(wù)時間點,然后實時去檢查是否到達(dá)指定時間點

思路有了,但是對于固定時間點執(zhí)行任務(wù)的場景以及后面更復(fù)雜的場景,自己實現(xiàn)可能就變得復(fù)雜。

下面再介紹幾個進階的定時任務(wù)的實現(xiàn)方式,可以適應(yīng)更復(fù)雜的業(yè)務(wù)場景。

sched

第三種方式是使用標(biāo)準(zhǔn)庫中sched模塊。sched 是事件調(diào)度器,
它通過 scheduler 類來調(diào)度事件,從而達(dá)到定時執(zhí)行任務(wù)的效果。

簡單示例如下:

import sched

schedule = sched.scheduler()  # 初始化 sched 模塊的 scheduler 類
schedule.enter(10, 1, task)  # 增加調(diào)度任務(wù)
schedule.run()  # 開始調(diào)度任務(wù)

scheduler 提供了兩個添加調(diào)度任務(wù)的函數(shù):

  • enter(delay, priority, action, argument=(), kwargs={})該函數(shù)可以延遲一定時間執(zhí)行任務(wù)。delay 表示延遲多長時間執(zhí)行任務(wù),單位是秒。
    priority為優(yōu)先級,越小優(yōu)先級越大。兩個任務(wù)指定相同的延遲時間,優(yōu)先級大的任務(wù)會向被執(zhí)行。
    action 即需要執(zhí)行的函數(shù),argument 和 kwargs 分別是函數(shù)的位置和關(guān)鍵字參數(shù)。

  • scheduler.enterabs(time, priority, action, argument=(), kwargs={})添加一項任務(wù),但這個任務(wù)會在 time 這時刻執(zhí)行。因此,time 是絕對時間。其他參數(shù)用法與 enter() 中的參數(shù)用法是一致。

優(yōu)點:

  • 執(zhí)行時間間隔和時間點執(zhí)行任務(wù)
  • 可以添加不同的任務(wù)
  • 任務(wù)可以設(shè)置優(yōu)先級

缺點:
scheduler 中的每個調(diào)度任務(wù)只會工作一次,不會無限循環(huán)被調(diào)用。如果想重復(fù)執(zhí)行同一任務(wù), 需要重復(fù)添加調(diào)度任務(wù)即可。

import sched
import threading
from functools import wraps

s = sched.scheduler()
STOP_FLAG = threading.Event()

def repeat(interval):
    def wrapper(func):
        @wraps(func)
        def inner():
            if not STOP_FLAG.is_set():
                s.enter(interval, 0, inner)
            func()
        return inner

    return wrapper


@repeat(5)
def new_task():
    return task()

new_task()
t = threading.Thread(target=s.run)
t.start()

實現(xiàn)原理

當(dāng)然我們僅僅學(xué)會怎么用還是不夠的,不能知其然而不知其所以然。
介紹了那么多方法,那么多庫,但是底層的實現(xiàn)邏輯都是差不多的。
一個完整的定時任務(wù)系統(tǒng),有三個部分:

  • 任務(wù)隊列(task queue)
    根據(jù)執(zhí)行時間和優(yōu)先級進行排序
    排序sorted(): 任務(wù)一多,整個隊列重排,性能堪憂
    heapq.pop():堆排序,只獲取最高優(yōu)先級的任務(wù),不重復(fù)排序
  • 調(diào)度器(scheduler)
  • 執(zhí)行器(executor)
    從前到后,實現(xiàn)的內(nèi)容越來越多,控制的精細(xì)度也就越來越高,實現(xiàn)的功能也就越來越豐富。
import sched
import time
import threading
from functools import wraps

from task import task
from typing import Callable

STOP_FLAG = threading.Event()


def timeloop(func: Callable, interval: int):
    while True:
        func()
        time.sleep(interval)


def repeat_task(func: Callable, interval: int):
    # 新建任務(wù)的前后,決定了是在任務(wù)結(jié)束后等待時間,還是固定間隔
    if not STOP_FLAG.is_set():
        t = threading.Timer(interval, repeat_task, args=(func, interval))
        t.start()
    func()


def timer_schedule(func: Callable, interval: int):
    repeat_task(func, interval)


s = sched.scheduler()


def repeat(interval):
    def wrapper(func):
        @wraps(func)
        def inner():
            if not STOP_FLAG.is_set():
                s.enter(interval, 0, inner)
            func()
        return inner

    return wrapper


@repeat(5)
def new_task():
    return task()


def sched_schedule(func: Callable, interval: int):
    func()
    t = threading.Thread(target=s.run)
    t.start()
    return t


def main(schedule):
    schedule_thread = schedule(new_task, 5)
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            STOP_FLAG.set()
            schedule_thread.join()
            break


if __name__ == "__main__":
    # main(timeloop)
    # main(timer_schedule)
    main(sched_schedule)

到此這篇關(guān)于Python中實現(xiàn)定時任務(wù)詳解的文章就介紹到這了,更多相關(guān)Python中實現(xiàn)定時任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論