欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python中實(shí)現(xiàn)定時(shí)任務(wù)的幾種方案

 更新時(shí)間:2023年10月06日 09:46:18   作者:coder Ethan  
本文呢給大家總結(jié)以下幾種方案實(shí)現(xiàn)定時(shí)任務(wù),可根據(jù)不同需求去使用不同方案,文章通過代碼示例介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴可以參考下

while True: + sleep()

利用while True的死循環(huán),加上 sleep()函數(shù)讓其暫停一段時(shí)間,達(dá)到每隔一段時(shí)間執(zhí)行特定任務(wù)的目的。

比較簡(jiǎn)單,例子如下:

import datetime
import time
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)
if __name__ == "__main__":
    loop_monitor()

主要缺點(diǎn):

  • 只能設(shè)定間隔,不能指定具體的時(shí)間
  • sleep 是一個(gè)阻塞函數(shù),也就是說 sleep 這一段時(shí)間,程序什么也不能操作。

Timeloop庫(kù)

Timeloop是一個(gè)庫(kù),可用于運(yùn)行多周期任務(wù)。

Timeloop內(nèi)部維護(hù)了一個(gè)任務(wù)列表jobs,用來管理所有任務(wù)。

可以使用裝飾器標(biāo)記任務(wù),這樣就會(huì)把任務(wù)加到任務(wù)列表jobs中,使用start方法啟動(dòng)任務(wù)列表重的所有任務(wù)。

示例如下:

import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print("2s job current time : {}".format(time.ctime()))
if __name__ == "__main__":
    tl.start(block=True)

運(yùn)行后打印如下:

[2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop..
[2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job <function sample_job_every_2s at 0x7fc3d022d0d0>
[2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set
2s job current time : Mon Oct  2 09:48:43 2023
2s job current time : Mon Oct  2 09:48:45 2023
2s job current time : Mon Oct  2 09:48:47 2023

同時(shí)Timeloop還有個(gè)stop方法,可以用來暫停所有任務(wù)。

threading.Timer

threading 模塊中的 Timer 是一個(gè)非阻塞函數(shù),比 sleep 稍好一點(diǎn),timer最基本理解就是定時(shí)器,我們可以啟動(dòng)多個(gè)定時(shí)任務(wù),這些定時(shí)器任務(wù)是異步執(zhí)行,所以不存在等待順序執(zhí)行問題。

主要有如下方法:

方法說明
Timer(interval, function, args=None, kwargs=None)創(chuàng)建定時(shí)器
cancel()取消定時(shí)器
start()使用線程方式執(zhí)行
join(self, timeout=None)主線程等待線程執(zhí)行結(jié)束

示例:

import datetime
from threading import Timer
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    # 注意 Timer 只能執(zhí)行一次,這里需要循環(huán)調(diào)用,否則只能執(zhí)行一次
    loop_monitor()
def loop_monitor():
    t = Timer(5, time_printer)
    t.start()
if __name__ == "__main__":
    loop_monitor()

sched模塊

sched模塊實(shí)現(xiàn)了一個(gè)通用事件調(diào)度器,在調(diào)度器類使用一個(gè)延遲函數(shù)等待特定的時(shí)間,執(zhí)行任務(wù)。同時(shí)支持多線程應(yīng)用程序,在每個(gè)任務(wù)執(zhí)行后會(huì)立刻調(diào)用延時(shí)函數(shù),以確保其他線程也能執(zhí)行。

class sched.scheduler(timefunc, delayfunc) 這個(gè)類定義了調(diào)度事件的通用接口,它需要外部傳入兩個(gè)參數(shù),timefunc是一個(gè)沒有參數(shù)的返回時(shí)間類型數(shù)字的函數(shù)(常用使用的如time模塊里面的time),delayfunc應(yīng)該是一個(gè)需要一個(gè)參數(shù)來調(diào)用、與timefunc的輸出兼容、并且作用為延遲多個(gè)時(shí)間單位的函數(shù)(常用的如time模塊的sleep)。

import datetime
import time
import sched
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成調(diào)度器
    s.enter(5, 1, time_printer, ())
    s.run()
if __name__ == "__main__":
    loop_monitor()

scheduler對(duì)象主要方法:

enter(delay, priority, action, argument),安排一個(gè)事件來延遲delay個(gè)時(shí)間單位。

cancel(event):從隊(duì)列中刪除事件。如果事件不是當(dāng)前隊(duì)列中的事件,則該方法將跑出一個(gè)ValueError。

run():運(yùn)行所有預(yù)定的事件。這個(gè)函數(shù)將等待(使用傳遞給構(gòu)造函數(shù)的delayfunc()函數(shù)),然后執(zhí)行事件,直到不再有預(yù)定的事件。比threading.Timer更好,不需要循環(huán)調(diào)用。

schedule模塊

schedule是一個(gè)第三方輕量級(jí)的任務(wù)調(diào)度模塊,可以按照秒,分,小時(shí),日期或者自定義事件執(zhí)行時(shí)間。schedule允許用戶使用簡(jiǎn)單、人性化的語(yǔ)法以預(yù)定的時(shí)間間隔定期運(yùn)行Python函數(shù)(或其它可調(diào)用函數(shù))。

示例:

import schedule
import time
def job():
    print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

也可以通過 @repeat() 裝飾靜態(tài)方法:

import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
    print('working...')
while True:
    run_pending()
    time.sleep(1)

傳遞參數(shù):

import schedule
def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
    schedule.run_pending()

裝飾器同樣能傳遞參數(shù):

from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)
while True:
    run_pending()

取消任務(wù):

import schedule
i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

運(yùn)行一次任務(wù):

import time
import schedule
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

根據(jù)標(biāo)簽檢索任務(wù):

# 檢索所有任務(wù):schedule.get_jobs()
import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)

根據(jù)標(biāo)簽取消任務(wù):

# 取消所有任務(wù):schedule.clear()
import schedule
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
    schedule.run_pending()

運(yùn)行任務(wù)到某時(shí)間:

import schedule
from datetime import datetime, timedelta, time
def job():
    print('working...')
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小時(shí)后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止
while True:
    schedule.run_pending()

馬上運(yùn)行所有任務(wù)(主要用于測(cè)試):

import schedule
def job():
    print('working...')
def job1():
    print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任務(wù)間延遲3秒

并行運(yùn)行:使用 Python 內(nèi)置隊(duì)列實(shí)現(xiàn):

import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)

APScheduler框架

APScheduler(advanceded python scheduler)基于Quartz的一個(gè)Python定時(shí)任務(wù)框架,實(shí)現(xiàn)了Quartz的所有功能,使用起來十分方便。提供了基于日期、固定時(shí)間間隔以及crontab類型的任務(wù),并且可以持久化任務(wù)。基于這些功能,我們可以很方便的實(shí)現(xiàn)一個(gè)Python定時(shí)任務(wù)系統(tǒng)。

Celery框架

Celery是一個(gè)簡(jiǎn)單,靈活,可靠的分布式系統(tǒng),用于處理大量消息,同時(shí)為操作提供維護(hù)此類系統(tǒng)所需的工具, 也可用于任務(wù)調(diào)度。Celery 的配置比較麻煩,如果你只是需要一個(gè)輕量級(jí)的調(diào)度工具,Celery 不會(huì)是一個(gè)好選擇。

Celery 是一個(gè)強(qiáng)大的分布式任務(wù)隊(duì)列,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機(jī)上運(yùn)行。通常使用它來實(shí)現(xiàn)異步任務(wù)(async task)和定時(shí)任務(wù)(crontab)。異步任務(wù)比如是發(fā)送郵件、或者文件上傳, 圖像處理等等一些比較耗時(shí)的操作 ,定時(shí)任務(wù)是需要在特定時(shí)間執(zhí)行的任務(wù)。

數(shù)據(jù)流工具Apache Airflow

概述

Apache Airflow 是Airbnb開源的一款數(shù)據(jù)流程工具,目前是Apache孵化項(xiàng)目。以非常靈活的方式來支持?jǐn)?shù)據(jù)的ETL過程,同時(shí)還支持非常多的插件來完成諸如HDFS監(jiān)控、郵件通知等功能。Airflow支持單機(jī)和分布式兩種模式,支持Master-Slave模式,支持Mesos等資源調(diào)度,有非常好的擴(kuò)展性。被大量公司采用。

Airflow使用Python開發(fā),它通過DAGs(Directed Acyclic Graph, 有向無(wú)環(huán)圖)來表達(dá)一個(gè)工作流中所要執(zhí)行的任務(wù),以及任務(wù)之間的關(guān)系和依賴。比如,如下的工作流中,任務(wù)T1執(zhí)行完成,T2和T3才能開始執(zhí)行,T2和T3都執(zhí)行完成,T4才能開始執(zhí)行。

在這里插入圖片描述

Airflow提供了各種Operator實(shí)現(xiàn),可以完成各種任務(wù)實(shí)現(xiàn):

  • BashOperator – 執(zhí)行 bash 命令或腳本。
  • SSHOperator – 執(zhí)行遠(yuǎn)程 bash 命令或腳本(原理同 paramiko 模塊)。
  • PythonOperator – 執(zhí)行 Python 函數(shù)。EmailOperator – 發(fā)送 Email。
  • HTTPOperator – 發(fā)送一個(gè) HTTP 請(qǐng)求。
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執(zhí)行 SQL 任務(wù)。
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

除了以上這些 Operators 還可以方便的自定義 Operators 滿足個(gè)性化的任務(wù)需求。

一些情況下,我們需要根據(jù)執(zhí)行結(jié)果執(zhí)行不同的任務(wù),這樣工作流會(huì)產(chǎn)生分支。如:

在這里插入圖片描述

這種需求可以使用BranchPythonOperator來實(shí)現(xiàn)。

Airflow 核心概念

  • DAGs:即有向無(wú)環(huán)圖(Directed Acyclic Graph),將所有需要運(yùn)行的tasks按照依賴關(guān)系組織起來,描述的是所有tasks執(zhí)行順序。
  • Operators:可以簡(jiǎn)單理解為一個(gè)class,描述了DAG中某個(gè)的task具體要做的事。其中,airflow內(nèi)置了很多operators,如BashOperator 執(zhí)行一個(gè)bash 命令,PythonOperator 調(diào)用任意的Python 函數(shù),EmailOperator 用于發(fā)送郵件,HTTPOperator 用于發(fā)送HTTP請(qǐng)求, SqlOperator 用于執(zhí)行SQL命令等等,同時(shí),用戶可以自定義Operator,這給用戶提供了極大的便利性。
  • Tasks:Task 是 Operator的一個(gè)實(shí)例,也就是DAGs中的一個(gè)node。
  • Task Instance:task的一次運(yùn)行。Web 界面中可以看到task instance 有自己的狀態(tài),包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
  • Task Relationships:DAGs中的不同Tasks之間可以有依賴關(guān)系,如 Task1 >> Task2,表明Task2依賴于Task2了。通過將DAGs和Operators結(jié)合起來,用戶就可以創(chuàng)建各種復(fù)雜的 工作流(workflow)。

Airflow 的架構(gòu)

在一個(gè)可擴(kuò)展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:

  • 元數(shù)據(jù)庫(kù):這個(gè)數(shù)據(jù)庫(kù)存儲(chǔ)有關(guān)任務(wù)狀態(tài)的信息。
  • 調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務(wù)狀態(tài)來決定哪些任務(wù)需要被執(zhí)行以及任務(wù)執(zhí)行優(yōu)先級(jí)的過程。調(diào)度器通常作為服務(wù)運(yùn)行。
  • 執(zhí)行器:Executor 是一個(gè)消息隊(duì)列進(jìn)程,它被綁定到調(diào)度器中,用于確定實(shí)際執(zhí)行每個(gè)任務(wù)計(jì)劃的工作進(jìn)程。有不同類型的執(zhí)行器,每個(gè)執(zhí)行器都使用一個(gè)指定工作進(jìn)程的類來執(zhí)行任務(wù)。例如,LocalExecutor 使用與調(diào)度器進(jìn)程在同一臺(tái)機(jī)器上運(yùn)行的并行進(jìn)程執(zhí)行任務(wù)。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨(dú)立的工作機(jī)器集群中的工作進(jìn)程執(zhí)行任務(wù)。
  • Workers:這些是實(shí)際執(zhí)行任務(wù)邏輯的進(jìn)程,由正在使用的執(zhí)行器確定。

在這里插入圖片描述

Worker的具體實(shí)現(xiàn)由配置文件中的executor來指定,airflow支持多種Executor:

  • SequentialExecutor: 單進(jìn)程順序執(zhí)行,一般只用來測(cè)試
  • LocalExecutor: 本地多進(jìn)程執(zhí)行
  • CeleryExecutor: 使用Celery進(jìn)行分布式任務(wù)調(diào)度
  • DaskExecutor:使用Dask進(jìn)行分布式任務(wù)調(diào)度
  • KubernetesExecutor: 1.10.0新增, 創(chuàng)建臨時(shí)POD執(zhí)行每次任務(wù)

生產(chǎn)環(huán)境一般使用CeleryExecutor和KubernetesExecutor。

使用CeleryExecutor的架構(gòu)如圖:

在這里插入圖片描述

使用KubernetesExecutor的架構(gòu)如圖:

在這里插入圖片描述

以上就是python中實(shí)現(xiàn)定時(shí)任務(wù)的幾種方案的詳細(xì)內(nèi)容,更多關(guān)于python實(shí)現(xiàn)定時(shí)任務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論