欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python定時任務(wù)apscheduler的詳細(xì)使用教程

 更新時間:2022年02月20日 08:00:25   作者:殿堂級退堂鼓藝術(shù)家  
APScheduler的全稱是Advanced?Python?Scheduler,它是一個輕量級的?Python定時任務(wù)調(diào)度框架,下面這篇文章主要給大家介紹了關(guān)于python定時任務(wù)apscheduler的詳細(xì)使用教程,需要的朋友可以參考下

前言

我們項目中總是避免不了要使用一些定時任務(wù),比如說最近的項目,用戶點擊報名考試以后需要在考試日期臨近的時候推送小程序消息提醒到客戶微信上,翻了翻 fastapi 中的實現(xiàn),雖然方法和包也不少,但是要不就是太重了(比如需要再開服務(wù),還要依賴 redis,都不好用),雖然也可以使用 time 模塊的 time.sleep()機(jī)上 fastapi 的后臺任務(wù)變相實現(xiàn),但是相對簡單的功能還行,復(fù)雜點的代碼起來就麻煩了,所以還是專人專事找個負(fù)責(zé)這個額的包吧。找來找去發(fā)現(xiàn) APScheduler 就挺適合,代碼簡單,實現(xiàn)效果也很好,這里做個記錄!

安裝

pip install apscheduler

主要組成部分

概念性東西,混個臉熟,代碼比這些定義好理解。

觸發(fā)器(trigger)包含調(diào)度邏輯,每一個作業(yè)有它自己的觸發(fā)器,用于決定接下來哪一個作業(yè)會運行。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的。說人話就是你指定那種方式觸發(fā)當(dāng)前的任務(wù)。

類型解釋
DateTrigger到期執(zhí)行(到xxxx年x月x日 x時x分x秒執(zhí)行) 對應(yīng)DateTrigger
IntervalTrigger間隔執(zhí)行(每5秒執(zhí)行一次)
CronTrigger一個crontab類型的條件(這個比較復(fù)雜,比如周一到周四的4-5點每5秒執(zhí)行一次)

作業(yè)存儲(job store)存儲被調(diào)度的作業(yè),默認(rèn)的作業(yè)存儲是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲是將作業(yè)保存在數(shù)據(jù)庫中。一個作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲時被序列化,并在加載時被反序列化。調(diào)度器不能分享同一個作業(yè)存儲。

Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態(tài)添加Jobstore。每個jobstore
都會綁定一個alias,scheduler在Add Job時,根據(jù)指定的jobstore在scheduler中找到相應(yīng)的jobstore,并
將job添加到j(luò)obstore中。

Jobstore主要是通過pickle庫的loads和dumps【實現(xiàn)核心是通過python的__getstate__和__setstate__重寫
實現(xiàn)】,每次變更時將Job動態(tài)保存到存儲中,使用時再動態(tài)的加載出來,作為存儲的可以是redis,也可以
是數(shù)據(jù)庫【通過sqlarchemy這個庫集成多種數(shù)據(jù)庫】,也可以是mongodb等
目前APScheduler支持的Jobstore:

MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore

執(zhí)行器(executor)處理作業(yè)的運行,他們通常通過在作業(yè)中提交制定的可調(diào)用對象到一個線程或者進(jìn)城池來進(jìn)行。當(dāng)作業(yè)完成時,執(zhí)行器將會通知調(diào)度器。

- 說人話就是添加任務(wù)時候用它來包裝的,executor的種類會根據(jù)不同的調(diào)度來選擇,如果選擇AsyncIO作為調(diào)度的庫,那么選擇AsyncIOExecutor,如果選擇tornado作為調(diào)度的庫,選擇TornadoExecutor,如果選擇啟動進(jìn)程作為調(diào)度,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的選擇需要根據(jù)實際的scheduler來選擇不同的執(zhí)行器
目前APScheduler支持的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor

調(diào)度器(scheduler)是其他的組成部分。你通常在應(yīng)用只有一個調(diào)度器,應(yīng)用的開發(fā)者通常不會直接處理作業(yè)存儲、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè).

Scheduler是APScheduler的核心,所有相關(guān)組件通過其定義。scheduler啟動之后,將開始按照配置的任務(wù)進(jìn)行調(diào)度。
除了依據(jù)所有定義Job的trigger生成的將要調(diào)度時間喚醒調(diào)度之外。當(dāng)發(fā)生Job信息變更時也會觸發(fā)調(diào)度。

scheduler可根據(jù)自身的需求選擇不同的組件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則
選擇TornadoScheduler。
目前APScheduler支持的Scheduler:

AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler

簡單應(yīng)用

import time
from apscheduler.schedulers.blocking import BlockingScheduler # 引入后臺

def my_job():
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5)
sched.start()

完整代碼

# trigeers 觸發(fā)器
# job stores job 存儲
# executors 執(zhí)行器
# schedulers 調(diào)度器

from pytz import utc
from sqlalchemy import func

from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    # 可以配置多個存儲
    #'mongo': {'type': 'mongodb'}, 
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存儲鏈接
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},     # 最大工作線程數(shù)20
    'processpool': ProcessPoolExecutor(max_workers=5)         # 最大工作進(jìn)程數(shù)為5
}
job_defaults = {
    'coalesce': False,   # 關(guān)閉新job的合并,當(dāng)job延誤或者異常原因未執(zhí)行時
    'max_instances': 3   # 并發(fā)運行新job默認(rèn)最大實例多少
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作為調(diào)度程序的時區(qū)


import os
import time

def print_time(name):
    print(f'{name} - {time.ctime()}')

def add_job(job_id, func, args, seconds):
    """添加job"""
    print(f"添加間隔執(zhí)行任務(wù)job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)

def add_coun_job(job_id, func, args, start_time):
    """添加job"""
    print(f"添加一次執(zhí)行任務(wù)job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='date',timezone='Asia/Shanghai', run_date=start_time)
    # scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])

def remove_job(job_id):
    """移除job"""
    scheduler.remove_job(job_id)
    print(f"移除job - {job_id}")

def pause_job(job_id):
    """停止job"""
    scheduler.pause_job(job_id)
    print(f"停止job - {job_id}")

def resume_job(job_id):
    """恢復(fù)job"""
    scheduler.resume_job(job_id)
    print(f"恢復(fù)job - {job_id}")

def get_jobs():
    """獲取所有job信息,包括已停止的"""
    res = scheduler.get_jobs()
    print(f"所有job - {res}")

def print_jobs():
    print(f"詳細(xì)job信息")
    scheduler.print_jobs()

def start():
    """啟動調(diào)度器"""
    scheduler.start()

def shutdown():
    """關(guān)閉調(diào)度器"""
    scheduler.shutdown()

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    # start()
    # print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
    # add_job('job_A', func=print_time, args=("A", ), seconds=1)
    # add_job('job_B', func=print_time, args=("B", ), seconds=2)
    # time.sleep(6)
    # pause_job('job_A') # 停止a
    # get_jobs()   #得到所有job
    # time.sleep(6)
    # print_jobs()
    # resume_job('job_A')
    # time.sleep(6)
    # remove_job('job_A')
    # time.sleep(6)
    from datetime import datetime
    import pytz
    start()
    
    date_temp = datetime(2022, 2, 19, 17, 30, 5)
    # scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text'])
    # scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])
    add_coun_job(job_id="job_C",func=print_time,args=('一次性執(zhí)行任務(wù)',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone())
    time.sleep(130)
    try:
        shutdown()
    except RuntimeError:
        pass

參考:http://www.dbjr.com.cn/article/232500.htm

總結(jié)

到此這篇關(guān)于python定時任務(wù)apscheduler使用的文章就介紹到這了,更多相關(guān)python定時任務(wù)apscheduler使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論