欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

celery實(shí)現(xiàn)動態(tài)設(shè)置定時任務(wù)

 更新時間:2021年03月10日 09:34:09   作者:特侖蘇純酸奶  
這篇文章主要為大家詳細(xì)介紹了celery實(shí)現(xiàn)動態(tài)設(shè)置定時任務(wù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下

本文實(shí)例為大家分享了celery動態(tài)設(shè)置定時任務(wù)的具體代碼,供大家參考,具體內(nèi)容如下

首先celery是一種異步任務(wù)隊(duì)列,如果還不熟悉這個開源軟件的請先看看官方文檔,快速入門。

這里講的動態(tài)設(shè)置定時任務(wù)的方法不使用數(shù)據(jù)庫保存定時任務(wù)的信息,所以是項(xiàng)目重啟后定時任務(wù)配置就會丟失,如果想保存成永久配置,可以考慮保存到數(shù)據(jù)庫、redis或者使用pickle、json保存成文件,在項(xiàng)目啟動時自動載入。

方法原理介紹

先來看一下celery的beat運(yùn)行過程。

上圖是beat的主要組成結(jié)構(gòu),beat中包含了一個service對象,service中包含了一個scheduler對象,scheduler中包含了一個schedule字典,schedule中key對應(yīng)的的value才是真正的定時任務(wù),是整個beat中最小的單元。

首先分別介紹一下各個對象和它們運(yùn)行的過程,beat是celery.apps.beat.Beat類創(chuàng)建的對象,調(diào)用beat.run()方法就可以啟動beat,下面是beat.run()方法的源碼。

def run(self):
 print(str(self.colored.cyan(
 'celery beat v{0} is starting.'.format(VERSION_BANNER))))
 self.init_loader()
 self.set_process_title()
 self.start_scheduler()

重點(diǎn)是在run()方法里調(diào)用了start_scheduler()方法,而start_scheduler()方法本質(zhì)上是創(chuàng)建了一個service對象(celery.beat.Service類),并調(diào)用service.start()方法,下面是beat.start_scheduler()方法的源碼。

def start_scheduler(self):
 if self.pidfile:
 platforms.create_pidlock(self.pidfile)
 service = self.Service(
 app=self.app,
 max_interval=self.max_interval,
 scheduler_cls=self.scheduler_cls,
 schedule_filename=self.schedule,
 )
 
 print(self.banner(service))
 
 self.setup_logging()
 if self.socket_timeout:
 logger.debug('Setting default socket timeout to %r',
 self.socket_timeout)
 socket.setdefaulttimeout(self.socket_timeout)
 try:
 self.install_sync_handler(service)
 service.start()
 except Exception as exc:
 logger.critical('beat raised exception %s: %r',
 exc.__class__, exc,
 exc_info=True)
 raise

調(diào)用了service.start()之后,會進(jìn)入一個死循環(huán),先使用self.scheduler.tick()獲取下一個任務(wù)a的定時點(diǎn)到現(xiàn)在時間的間隔,然后進(jìn)入睡眠,睡眠結(jié)束之后判斷如果self.scheduler里的下一個任務(wù)a可以執(zhí)行,就立即執(zhí)行,并獲取self.scheduler里的下下一個任務(wù)b的定時點(diǎn)到現(xiàn)在時間的間隔,進(jìn)入下一次循環(huán)。下面是service.start()的源碼。

def start(self, embedded_process=False):
 info('beat: Starting...')
 debug('beat: Ticking with max interval->%s',
 humanize_seconds(self.scheduler.max_interval))
 
 signals.beat_init.send(sender=self)
 if embedded_process:
 signals.beat_embedded_init.send(sender=self)
 platforms.set_process_title('celery beat')
 
 try:
 while not self._is_shutdown.is_set():
 interval = self.scheduler.tick()
 if interval and interval > 0.0:
 debug('beat: Waking up %s.',
 humanize_seconds(interval, prefix='in '))
 time.sleep(interval)
 if self.scheduler.should_sync():
 self.scheduler._do_sync()
 except (KeyboardInterrupt, SystemExit):
 self._is_shutdown.set()
 finally:
 self.sync()

service.scheduler默認(rèn)是celery.beat.PersistentScheduler類的實(shí)例對象,而celery.beat.PersistentScheduler其實(shí)是celery.beat.Scheduler的子類,所以scheduler.schedule是celery.beat.Scheduler類中的字典,保存的是celery.beat.ScheduleEntry類型的對象。ScheduleEntry的實(shí)例對象保存了定時任務(wù)的名稱、參數(shù)、定時信息、過期時間等信息。celery.beat.Scheduler類實(shí)現(xiàn)了對schedule的更新方法即update_from_dict(self, dict_)方法。下面是update_from_dict(self, dict_)方法的源碼。

def _maybe_entry(self, name, entry):
 if isinstance(entry, self.Entry):
 entry.app = self.app
 return entry
 return self.Entry(**dict(entry, name=name, app=self.app))
 
def update_from_dict(self, dict_):
 self.schedule.update({
 name: self._maybe_entry(name, entry)
 for name, entry in items(dict_)
 })

可以看到update_from_dict(self, dict_)方法實(shí)際上是向schedule中更新了self.Entry的實(shí)例對象,而self.Entry從celery.beat.Scheduler的源碼知道是celery.beat.ScheduleEntry。

到這里整個流程就粗略的介紹完了,基本過程是這個樣子。

但是從前面start_scheduler()的源碼可以看到,beat在內(nèi)部創(chuàng)建一個service之后,就直接進(jìn)入死循環(huán)了,所以從外面無法拿到service對象,就不能對service里的scheduler對象操作,就不能對scheduler的schedule字典操作,所以就無法在beat運(yùn)行的過程中動態(tài)添加定時任務(wù)。

方法介紹

前面介紹完原理,現(xiàn)在來講一下解決思路。主要思路就是讓start_scheduler方法中創(chuàng)建的service暴露出來。所以就想到手寫一個類去繼承Beat,重寫start_scheduler()方法。

import socket
from celery import platforms
from celery.apps.beat import Beat
 
 
class MyBeat(Beat):
 '''
 繼承Beat 添加一個獲取service的方法
 '''
 def start_scheduler(self):
 if self.pidfile:
  platforms.create_pidlock(self.pidfile)
 # 修改了獲取service的方式
 service = self.get_service()
 
 print(self.banner(service))
 
 self.setup_logging()
 if self.socket_timeout:
  logger.debug('Setting default socket timeout to %r',
    self.socket_timeout)
  socket.setdefaulttimeout(self.socket_timeout)
 try:
  self.install_sync_handler(service)
  service.start()
 except Exception as exc:
  logger.critical('beat raised exception %s: %r',
    exc.__class__, exc,
    exc_info=True)
  raise
 
 def get_service(self):
 '''
 這個是自定義的 目的是為了把service暴露出來,方便對service的scheduler操作,因?yàn)槎〞r任務(wù)信息都存放在service.scheduler里
 :return:
 '''
 service = getattr(self, "service", None)
 if service is None:
  service = self.Service(
  app=self.app,
  max_interval=self.max_interval,
  scheduler_cls=self.scheduler_cls,
  schedule_filename=self.schedule,
  )
  setattr(self, "service", service)
 return self.service

在MyBeat類中添加一個get_service()方法,如果beat沒有servic對象就創(chuàng)建一個,如果有就直接返回,方便對service的scheduler操作。

然后在此基礎(chǔ)上實(shí)現(xiàn)對定時任務(wù)的增刪改查操作。

def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',
   month_of_year='*', **kwargs):
 '''
 創(chuàng)建或更新定時任務(wù)
 :param task_name: 定時任務(wù)名稱
 :param cron_task: task名稱
 :param minute: 以下是時間
 :param hour:
 :param day_of_week:
 :param day_of_month:
 :param month_of_year:
 :param kwargs:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 entries = dict()
 entries[task_name] = {
 'task': cron_task,
 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,
    month_of_year=month_of_year, **kwargs),
 'options': {'expires': 3600}}
 scheduler.update_from_dict(entries)
 
 
def del_cron_task(task_name: str):
 '''
 刪除定時任務(wù)
 :param task_name:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 if scheduler.schedule.get(task_name, None) is not None:
 del scheduler.schedule[task_name]
 
 
def get_cron_task():
 '''
 獲取當(dāng)前所有定時任務(wù)的配置
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()]
 return ret

但是僅僅是這樣還不能解決問題,從前面的serive.start()的源碼看到,beat啟動后會進(jìn)入一個死循環(huán),如果直接在主線程啟動beat,必然會阻塞在死循環(huán)中,所以需要為beat創(chuàng)建一個子線程,這樣才影響主線程的其他操作。

flag = False
 
beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,
  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',
  scheduler_cls=None, # XXX use scheduler
  redirect_stdouts=None,
  redirect_stdouts_level=None)
 
 
# 設(shè)置主動啟動beat是為了避免使用celery -A celery_demo worker 命令重復(fù)啟動worker
def run():
 '''
 啟動Beat
 :return:
 '''
 beat.run()
 
 
def new_thread():
 '''
 創(chuàng)建一個線程啟動Beat 最多只能創(chuàng)建一個
 :return:
 '''
 global flag
 if not flag:
 t = threading.Thread(target=run, daemon=True)
 t.start()
 # 啟動成功2s后才能操作定時任務(wù) 否則可能會報(bào)錯
 time.sleep(2)
 flag = True

可能看到上面的代碼有人會想,為什么不在主程序加載完成就啟動為beat創(chuàng)建一個子線程,還非要寫個函數(shù)等待主動調(diào)用?這是因?yàn)槔缭谑褂胐jango+celery組合時,一般啟動django和啟動celery woker是兩個獨(dú)立的進(jìn)程,如果讓django在加載代碼的時候自動啟動beat的子線程,那么在使用celery -A demo_name worker 啟動celery時,會重新加載一邊django的代碼,因?yàn)閏elery需要掃描每個app下的tasks.py文件,加載異步任務(wù)函數(shù),這時啟動celery woker就會也啟動一個beat子線程,可能會造成定時任務(wù)重復(fù)執(zhí)行的情況。所以在這里設(shè)置成主動開啟beat子線程,目的就是為了celery worker啟動不重復(fù)創(chuàng)建beat線程。

完整的代碼如下:

import socket
import time
import threading
from celery import platforms
from celery.schedules import crontab
from celery.apps.beat import Beat
from celery.utils.log import get_logger
from celery_demo import celery_app
 
logger = get_logger('celery.beat')
flag = False
 
 
class MyBeat(Beat):
 '''
 繼承Beat 添加一個獲取service的方法
 '''
 def start_scheduler(self):
 if self.pidfile:
  platforms.create_pidlock(self.pidfile)
 # 修改了獲取service的方式
 service = self.get_service()
 
 print(self.banner(service))
 
 self.setup_logging()
 if self.socket_timeout:
  logger.debug('Setting default socket timeout to %r',
    self.socket_timeout)
  socket.setdefaulttimeout(self.socket_timeout)
 try:
  self.install_sync_handler(service)
  service.start()
 except Exception as exc:
  logger.critical('beat raised exception %s: %r',
    exc.__class__, exc,
    exc_info=True)
  raise
 
 def get_service(self):
 '''
 這個是自定義的 目的是為了把service暴露出來,方便對service的scheduler操作,因?yàn)槎〞r任務(wù)信息都存放在service.scheduler里
 :return:
 '''
 service = getattr(self, "service", None)
 if service is None:
  service = self.Service(
  app=self.app,
  max_interval=self.max_interval,
  scheduler_cls=self.scheduler_cls,
  schedule_filename=self.schedule,
  )
  setattr(self, "service", service)
 return self.service
 
 
beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,
  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',
  scheduler_cls=None, # XXX use scheduler
  redirect_stdouts=None,
  redirect_stdouts_level=None)
 
 
# 設(shè)置主動啟動beat是為了避免使用celery -A celery_demo worker 命令重復(fù)啟動worker
def run():
 '''
 啟動Beat
 :return:
 '''
 beat.run()
 
 
def new_thread():
 '''
 創(chuàng)建一個線程啟動Beat 最多只能創(chuàng)建一個
 :return:
 '''
 global flag
 if not flag:
 t = threading.Thread(target=run, daemon=True)
 t.start()
 # 啟動成功2s后才能操作定時任務(wù) 否則可能會報(bào)錯
 time.sleep(2)
 flag = True
 
 
def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',
   month_of_year='*', **kwargs):
 '''
 創(chuàng)建或更新定時任務(wù)
 :param task_name: 定時任務(wù)名稱
 :param cron_task: task名稱
 :param minute: 以下是時間
 :param hour:
 :param day_of_week:
 :param day_of_month:
 :param month_of_year:
 :param kwargs:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 entries = dict()
 entries[task_name] = {
 'task': cron_task,
 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,
    month_of_year=month_of_year, **kwargs),
 'options': {'expires': 3600}}
 scheduler.update_from_dict(entries)
 
 
def del_cron_task(task_name: str):
 '''
 刪除定時任務(wù)
 :param task_name:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 if scheduler.schedule.get(task_name, None) is not None:
 del scheduler.schedule[task_name]
 
 
def get_cron_task():
 '''
 獲取當(dāng)前所有定時任務(wù)的配置
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()]
 return ret

另外還可以參考我的github,相關(guān)的注釋在代碼里寫的較為清晰。

注意:使用這種方式添加/刪除定時任務(wù)只是保存在內(nèi)存中的,項(xiàng)目重啟后就會丟失。如果想要持久化,可以參照上面的方法,把相關(guān)信息保存到數(shù)據(jù)庫或其他可持久保存文件中,在beat線程啟動時加載相關(guān)任務(wù)信息,在對定時任務(wù)修改做增刪改時及時修改數(shù)據(jù)庫或文件中內(nèi)容。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • 如何利用Python實(shí)現(xiàn)簡易的音頻播放器

    如何利用Python實(shí)現(xiàn)簡易的音頻播放器

    這篇文章主要介紹了如何利用Python實(shí)現(xiàn)簡易的音頻播放器,需要用到的庫有pygame和tkinter,實(shí)現(xiàn)音頻播放的功能,供大家學(xué)習(xí)參考,希望對你有所幫助
    2022-03-03
  • 教你用python控制安卓手機(jī)

    教你用python控制安卓手機(jī)

    不久前,我在思考如何通過向好友發(fā)送幾分鐘的垃圾郵件來惹惱我的朋友,而在做一些研究的過程中,我遇到了Android調(diào)試橋.在本快速指南中,我將向您展示如何使用Python與之交互以及如何創(chuàng)建2個快速腳本.需要的朋友可以參考下
    2021-05-05
  • Python中幾個比較常見的名詞解釋

    Python中幾個比較常見的名詞解釋

    這篇文章主要介紹了Python中幾個比較常見的名詞解釋,本文解釋同樣適應(yīng)其它編程語言,本文講解了循環(huán)、迭代、遞歸、遍歷等名詞的含義,需要的朋友可以參考下
    2015-07-07
  • Python對列表的操作知識點(diǎn)詳解

    Python對列表的操作知識點(diǎn)詳解

    在本篇文章里小編給大家整理了關(guān)于Python對列表的操作知識點(diǎn)總結(jié)以及實(shí)例代碼運(yùn)用,需要的朋友們跟著學(xué)習(xí)下。
    2019-08-08
  • 利用Python多處理庫處理3D數(shù)據(jù)詳解

    利用Python多處理庫處理3D數(shù)據(jù)詳解

    本文將介紹處理大量數(shù)據(jù)時非常方便的工具,例如tqdm與 multiprocessing?imap??一起使用、并行處理檔案、繪制和處理3D數(shù)據(jù)等,感興趣的小伙伴可以了解一下
    2021-12-12
  • Python變量格式化輸出實(shí)現(xiàn)原理解析

    Python變量格式化輸出實(shí)現(xiàn)原理解析

    這篇文章主要介紹了Python變量格式化輸出實(shí)現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08
  • python os模塊使用方法介紹

    python os模塊使用方法介紹

    OS ( Operating System 操作系統(tǒng) ) 操作系統(tǒng)模塊;它是屬于python的標(biāo)準(zhǔn)庫,常用于處理文件和目錄(文件夾)的操作。本文為大家總結(jié)了這個模塊的常用方法,希望有所幫助
    2022-08-08
  • Python報(bào)錯SyntaxError:unexpected?EOF?while?parsing的解決辦法

    Python報(bào)錯SyntaxError:unexpected?EOF?while?parsing的解決辦法

    在運(yùn)行或編寫一個程序時常會遇到錯誤異常,這時python會給你一個錯誤提示類名,告訴出現(xiàn)了什么樣的問題,下面這篇文章主要給大家介紹了關(guān)于Python報(bào)錯SyntaxError:unexpected?EOF?while?parsing的解決辦法,需要的朋友可以參考下
    2022-07-07
  • 如何讓PyQt5中QWebEngineView與JavaScript交互

    如何讓PyQt5中QWebEngineView與JavaScript交互

    這篇文章主要介紹了如何讓PyQt5中QWebEngineView與JavaScript交互,幫助大家更好的理解和學(xué)習(xí)PyQt5框架,感興趣的朋友可以了解下
    2020-10-10
  • python實(shí)現(xiàn)的希爾排序算法實(shí)例

    python實(shí)現(xiàn)的希爾排序算法實(shí)例

    這篇文章主要介紹了python實(shí)現(xiàn)的希爾排序算法,實(shí)例分析了基于Python實(shí)現(xiàn)希爾排序的相關(guān)技巧,需要的朋友可以參考下
    2015-07-07

最新評論