欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python第三方模塊apscheduler安裝和基本使用

 更新時(shí)間:2023年03月05日 16:05:20   作者:小胖_@  
本文主要介紹了Python第三方模塊apscheduler安裝和基本使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

apscheduler 模塊

安裝apscheduler 模塊

pip install apscheduler

apscheduler 模塊介紹

APScheduler(Advanced Python Scheduler)是一個(gè)輕量級的Python定時(shí)任務(wù)調(diào)度框架(Python庫)。

APScheduler有三個(gè)內(nèi)置的調(diào)度系統(tǒng),其中包括:

  • cron式調(diào)度(可選開始/結(jié)束時(shí)間)
  • 基于間隔的執(zhí)行(以偶數(shù)間隔運(yùn)行作業(yè),也可以選擇開始/結(jié)束時(shí)間)
  • 一次性延遲執(zhí)行任務(wù)(在指定的日期/時(shí)間內(nèi)運(yùn)行作業(yè)一次)

支持的后端存儲作業(yè)

APScheduler可以任意混合和匹配調(diào)度系統(tǒng)和作業(yè)存儲的后端,其中支持后端存儲作業(yè)包括:

  • Memory
  • SQLAlchemy
  • MongoDB
  • Redis
  • RethinkDB
  • ZooKeeper

APScheduler有四種組成部分

  • triggers(觸發(fā)器)中包含調(diào)度邏輯,每個(gè)作業(yè)都由自己的觸發(fā)器來決定下次運(yùn)行時(shí)間。除了他們自己初始配置意外,觸發(fā)器完全是無狀態(tài)的。
  • job stores(作業(yè)存儲器)存儲被調(diào)度的作業(yè),默認(rèn)的作業(yè)存儲器只是簡單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲器則是將作業(yè)保存在數(shù)據(jù)庫中。當(dāng)作業(yè)被保存到一個(gè)持久化的作業(yè)存儲器中的時(shí)候,該作業(yè)的數(shù)據(jù)會被序列化,并在加載時(shí)被反序列化。作業(yè)存儲器不能共享調(diào)度器。
  • executors(執(zhí)行器)處理作業(yè)的運(yùn)行,他們通常通過在作業(yè)中提交指定的可調(diào)用對象到一個(gè)線程或者進(jìn)程池來進(jìn)行。當(dāng)作業(yè)完成時(shí),執(zhí)行器將會通知調(diào)度器。
  • schedulers(調(diào)度器)配置作業(yè)存儲器和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)。根據(jù)不同的應(yīng)用場景可以選用不同的調(diào)度器,可選的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7種。

各組件簡介

觸發(fā)器

當(dāng)你調(diào)度作業(yè)的時(shí)候,你需要為這個(gè)作業(yè)選擇一個(gè)觸發(fā)器,用來描述這個(gè)作業(yè)何時(shí)被觸發(fā),APScheduler有三種內(nèi)置的觸發(fā)器類型:

  • date: 一次性指定日期;
  • interval: 在某個(gè)時(shí)間范圍內(nèi)間隔多長時(shí)間執(zhí)行一次;
  • cron :Linux crontab格式兼容,最為強(qiáng)大。

date 最基本的一種調(diào)度,作業(yè)只會執(zhí)行一次。它的參數(shù)如下:

1.run_date
(datetime|str) – 作業(yè)的運(yùn)行日期或時(shí)間

2.timezone
(datetime.tzinfo|str) – 指定時(shí)區(qū)

作業(yè)存儲器

如果你的應(yīng)用在每次啟動的時(shí)候都會重新創(chuàng)建作業(yè),那么使用默認(rèn)的作業(yè)存儲器(MemoryJobStore)即可,但是如果你需要在調(diào)度器重啟或者應(yīng)用程序奔潰的情況下任然保留作業(yè),你應(yīng)該根據(jù)你的應(yīng)用環(huán)境來選擇具體的作業(yè)存儲器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多數(shù)RDBMS)

執(zhí)行器

對執(zhí)行器的選擇取決于你使用上面哪些框架,大多數(shù)情況下,使用默認(rèn)的ThreadPoolExecutor已經(jīng)能夠滿足需求。如果你的應(yīng)用涉及到CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時(shí)使用兩者,將ProcessPoolExecutor作為第二執(zhí)行器。

選擇合適的調(diào)度器

  • BlockingScheduler : 當(dāng)調(diào)度器是你應(yīng)用中唯一要運(yùn)行的東西時(shí)
  • BackgroundScheduler : 當(dāng)你沒有運(yùn)行任何其他框架并希望調(diào)度器在你應(yīng)用的后臺執(zhí)行時(shí)使用。
  • AsyncIOScheduler : 當(dāng)你的程序使用了asyncio(一個(gè)異步框架)的時(shí)候使用。
  • GeventScheduler : 當(dāng)你的程序使用了gevent(高性能的Python并發(fā)框架)的時(shí)候使用。
  • TornadoScheduler : 當(dāng)你的程序基于Tornado(一個(gè)web框架)的時(shí)候使用。
  • TwistedScheduler : 當(dāng)你的程序使用了Twisted(一個(gè)異步框架)的時(shí)候使用
  • QtScheduler : 如果你的應(yīng)用是一個(gè)Qt應(yīng)用的時(shí)候可以使用。

apscheduler 模塊使用

添加作業(yè)

有兩種方式可以添加一個(gè)新的作業(yè):

add_job來添加作業(yè);

# -*- coding:utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def my_job1():
    print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

def my_job2():
    print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

sched = BlockingScheduler()
# 每隔5秒運(yùn)行一次my_job1
sched.add_job(my_job1, 'interval', seconds=5, id='my_job1')
# 每隔5秒運(yùn)行一次my_job2
sched.add_job(my_job2, 'cron', second='*/5', id='my_job2')
sched.start()

裝飾器模式添加作業(yè)。

# -*- coding:utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

sched = BlockingScheduler()

# 每隔5秒運(yùn)行一次my_job1
@sched.scheduled_job('interval', seconds=5, id='my_job1')
def my_job1():
    print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# 每隔5秒運(yùn)行一次my_job2
@sched.scheduled_job('cron', second='*/5', id='my_job2')
def my_job2():
    print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

sched.start()

移除作業(yè)

沒有移除作業(yè)

# -*- coding:utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def my_job(text=""):
    print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

sched = BlockingScheduler()
job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個(gè)作業(yè)'])
# #如果有多個(gè)任務(wù)序列的話可以給每個(gè)任務(wù)設(shè)置ID號,可以根據(jù)ID號選擇清除對象,且remove放到start前才有效
sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個(gè)作業(yè)'])
sched.start()

代碼執(zhí)行結(jié)果:

在這里插入圖片描述

使用remove() 移除作業(yè)

# -*- coding:utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def my_job(text=""):
    print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

sched = BlockingScheduler()
job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個(gè)作業(yè)'])
job.remove()
# #如果有多個(gè)任務(wù)序列的話可以給每個(gè)任務(wù)設(shè)置ID號,可以根據(jù)ID號選擇清除對象,且remove放到start前才有效
sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個(gè)作業(yè)'])
sched.start()

代碼執(zhí)行結(jié)果:

在這里插入圖片描述

使用remove_job()移除作業(yè)

# -*- coding:utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def my_job(text=""):
    print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

sched = BlockingScheduler()
job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個(gè)作業(yè)'])
# #如果有多個(gè)任務(wù)序列的話可以給每個(gè)任務(wù)設(shè)置ID號,可以根據(jù)ID號選擇清除對象,且remove放到start前才有效
sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個(gè)作業(yè)'])
sched.remove_job('my_job_id')
sched.start()

代碼執(zhí)行結(jié)果:

在這里插入圖片描述

觸發(fā)器類型

APScheduler有3中內(nèi)置的觸發(fā)器類型:

  • 新建一個(gè)調(diào)度器(scheduler);
  • 添加一個(gè)調(diào)度任務(wù)(job store);
  • 運(yùn)行調(diào)度任務(wù)。

代碼實(shí)現(xiàn)

# -*- coding:utf-8 -*-
import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def my_job(text="默認(rèn)值"):
    print(text, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=3, args=['3秒定時(shí)'])
# 2018-3-17 00:00:00 執(zhí)行一次,args傳遞一個(gè)text參數(shù)
sched.add_job(my_job, 'date', run_date=datetime.date(2019, 10, 17), args=['根據(jù)年月日定時(shí)執(zhí)行'])
# 2018-3-17 13:46:00 執(zhí)行一次,args傳遞一個(gè)text參數(shù)
sched.add_job(my_job, 'date', run_date=datetime.datetime(2019, 10, 17, 14, 10, 0), args=['根據(jù)年月日時(shí)分秒定時(shí)執(zhí)行'])
# sched.start()
"""
interval 間隔調(diào)度,參數(shù)如下:
    weeks (int) – 間隔幾周
    days (int) – 間隔幾天
    hours (int) – 間隔幾小時(shí)
    minutes (int) – 間隔幾分鐘
    seconds (int) – 間隔多少秒
    start_date (datetime|str) – 開始日期
    end_date (datetime|str) – 結(jié)束日期
    timezone (datetime.tzinfo|str) – 時(shí)區(qū)
"""
"""
cron參數(shù)如下:
    year (int|str) – 年,4位數(shù)字
    month (int|str) – 月 (范圍1-12)
    day (int|str) – 日 (范圍1-31)
    week (int|str) – 周 (范圍1-53)
    day_of_week (int|str) – 周內(nèi)第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
    hour (int|str) – 時(shí) (范圍0-23)
    minute (int|str) – 分 (范圍0-59)
    second (int|str) – 秒 (范圍0-59)
    start_date (datetime|str) – 最早開始日期(包含)
    end_date (datetime|str) – 最晚結(jié)束時(shí)間(包含)
    timezone (datetime.tzinfo|str) – 指定時(shí)區(qū)
"""
# my_job將會在6,7,8,11,12月的第3個(gè)周五的1,2,3點(diǎn)運(yùn)行
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2018-12-30 00:00:00,每周一到周五早上五點(diǎn)半運(yùn)行job_function
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2018-12-31')

# 表示2017年3月22日17時(shí)19分07秒執(zhí)行該程序
sched.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)

# 表示任務(wù)在6,7,8,11,12月份的第三個(gè)星期五的00:00,01:00,02:00,03:00 執(zhí)行該程序
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

# 表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

# 表示每5秒執(zhí)行該程序一次,相當(dāng)于interval 間隔調(diào)度中seconds = 5
sched.add_job(my_job, 'cron', second='*/5', args=['5秒定時(shí)'])
sched.start()
cron表達(dá)式參數(shù)描述
*anyFire on every value
*/aanyFire every a values, starting from the minimum
a-banyFire on any value within the a-b range (a must be smaller than b)
a-b/canyFire every c values within the a-b range
xth ydayFire on the x -th occurrence of weekday y within the month
last xdayFire on the last occurrence of weekday x within the month
lastdayFire on the last day within the month
x,y,zanyFire on any matching expression; can combine any number of any of the above expressions

使用SQLAlchemy作業(yè)存儲器存放作業(yè)

# -*- coding:utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
import logging
sched = BlockingScheduler()

def my_job():
    print('my_job is running, Now is %s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# 使用sqlalchemy作業(yè)存儲器
# 根據(jù)自己電腦安裝的庫選擇用什么連接 ,如pymysql   其中:scrapy表示數(shù)據(jù)庫的名稱,操作數(shù)據(jù)庫之前應(yīng)創(chuàng)建對應(yīng)的數(shù)據(jù)庫
url = 'mysql+pymysql://root:123456@localhost:3306/scrapy?charset=utf8'
sched.add_jobstore('sqlalchemy', url=url)
# 添加作業(yè)
sched.add_job(my_job, 'interval', id='myjob', seconds=5)

log = logging.getLogger('apscheduler.executors.default')
log.setLevel(logging.INFO)  # DEBUG
# 設(shè)定日志格式
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
h = logging.StreamHandler()
h.setFormatter(fmt)
log.addHandler(h)

sched.start()

暫停和恢復(fù)作業(yè)

# 暫停作業(yè):
apsched.job.Job.pause()
apsched.schedulers.base.BaseScheduler.pause_job()
# 恢復(fù)作業(yè):
apsched.job.Job.resume()
apsched.schedulers.base.BaseScheduler.resume_job()

獲得job列表

  • get_jobs(),它會返回所有的job實(shí)例;
  • 使用print_jobs()來輸出所有格式化的作業(yè)列表;
  • get_job(job_id=“任務(wù)ID”)獲取指定任務(wù)的作業(yè)列表。

代碼實(shí)現(xiàn):

# -*- coding:utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def my_job(text=""):
    print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

sched = BlockingScheduler()

job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個(gè)作業(yè)'])
sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個(gè)作業(yè)'])

print(sched.get_jobs())

print(sched.get_job(job_id="my_job_id"))

sched.print_jobs()
sched.start()

關(guān)閉調(diào)度器

默認(rèn)情況下調(diào)度器會等待所有正在運(yùn)行的作業(yè)完成后,關(guān)閉所有的調(diào)度器和作業(yè)存儲。如果你不想等待,可以將wait選項(xiàng)設(shè)置為False。

sched.shutdown()
sched.shutdown(wait=False)

到此這篇關(guān)于Python第三方模塊apscheduler安裝和基本使用的文章就介紹到這了,更多相關(guān)Python apscheduler安裝和使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論