欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python?Celery動態(tài)添加定時任務生產(chǎn)實踐指南

 更新時間:2022年08月19日 09:41:20   作者:李俊的博客  
elery是一種異步任務隊列,如果還不熟悉這個開源軟件的請先看看官方文檔,快速入門,下面這篇文章主要給大家介紹了關于Python?Celery動態(tài)添加定時任務生產(chǎn)實踐的相關資料,需要的朋友可以參考下

一、背景

實際工作中會有一些耗時的異步任務需要使用定時調(diào)度,比如發(fā)送郵件,拉取數(shù)據(jù),執(zhí)行定時腳本

通過celery 實現(xiàn)調(diào)度主要思想是 通過引入中間人redis,啟動 worker 進行任務執(zhí)行 ,celery-beat進行定時任務數(shù)據(jù)存儲

二、Celery動態(tài)添加定時任務的官方文檔

celery文檔:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

celery 自定義調(diào)度類說明: 

自定義調(diào)度器類可以在命令行中指定(--scheduler參數(shù))

django-celery-beat文檔 : https://pypi.org/project/django-celery-beat/

關于django-celery-beat 插件的說明: 

此擴展使您能夠?qū)⒍ㄆ谌蝿沼媱澊鎯υ跀?shù)據(jù)庫中,可以從 Django 管理界面管理周期性任務,您可以在其中創(chuàng)建、編輯和刪除周期性任務以及它們應該運行的頻率

三、celery簡單實用

3.1 基礎環(huán)境配置

1. 安裝最新版本的Django

pip3 install django #當前我安裝的版本是 3.0.6

2. 創(chuàng)建項目

django-admin startproject typeidea
django-admin startapp blog

3.安裝 celery

pip3 install django-celery
pip3 install -U Celery 
pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
pip3 install django-celery-beat # 用于動態(tài)添加定時任務
pip3 install django-celery-results
pip3 install redis

3.2 測試使用Celery應用

1. 創(chuàng)建blog目錄、新建task.py

首先在Django項目中創(chuàng)建一個blog文件夾,并且在blog文件夾下創(chuàng)建tasks.py模塊, 如下:

 tasks.py代碼如下: 

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
 
"""
#File: tasks.py
#Time: 2022/3/30 2:26 下午
#Author: julius
"""
from celery import Celery
 
# 使用redis做為broker
app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
 
# 創(chuàng)建任務函數(shù)
@app.task
def my_task():
    print('任務正在執(zhí)行...')

Celery第一個參數(shù)是給其設定一個名字, 第二參數(shù)我們設定一個中間人broker, 在這里我們使用Redis作為中間人。my_task函數(shù)是我們編寫的一個任務函數(shù), 通過加上裝飾器app.task, 將其注冊到broker的隊列中。

2. 啟動redis、創(chuàng)建worker

現(xiàn)在我們在創(chuàng)建一個worker, 等待處理隊列中的任務。

進入項目的根目錄,執(zhí)行命令: celery -A celery_tasks.tasks worker -l info

 3. 調(diào)用任務

下面來測試一下功能,創(chuàng)建一個任務,加入任務隊列中,提供worker執(zhí)行。

進入python終端, 執(zhí)行如下代碼:

$ python manage.py shell
>>> from blog.tasks import my_task
>>> my_task.delay()
<AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

調(diào)用一個任務函數(shù),將會返回一個AsyncResult對象,這個對象可以用來檢查任務的狀態(tài)或者獲得任務的返回值。

4. 查看結(jié)果

在worker的終端查看任務執(zhí)行情況,可以看到已經(jīng)收到83484dfe-f729-417b-8e51-6c7ae32a1377 任務,并打印了任務執(zhí)行信息

5. 存儲并查看任務執(zhí)行狀態(tài)

把任務執(zhí)行結(jié)果賦值給ret,然后調(diào)用result() 會產(chǎn)生 DisabledBackend 報錯,可見沒有配置后端存儲的時候并不能保存任務執(zhí)行的狀態(tài)信息,下一節(jié)我們會講到如何配置backend保存任務執(zhí)行結(jié)果

$ python manage.py shell
>>> from blog.tasks import my_task
>>> ret=my_task.delay()
>>> ret.result()

四、配置backend存儲任務執(zhí)行結(jié)果 

如果我們想跟蹤任務的狀態(tài),Celery需要將結(jié)果保存到某個地方。有幾種保存的方案可選:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

1. 添加backend參數(shù)

在本例中我們使用Redis作為存儲結(jié)果的方案,通過Celery的backend參數(shù)來設定任務結(jié)果存儲地址。我們將tasks模塊修改如下:

from celery import Celery
 
# 使用redis作為broker以及backend
app = Celery('celery_tasks.tasks',
             broker='redis://127.0.0.1:6379/8',
             backend='redis://127.0.0.1:6379/9')
 
# 創(chuàng)建任務函數(shù)
@app.task
def my_task(a, b):
    print("任務函數(shù)正在執(zhí)行....")
    return a + b

給Celery增加了backend參數(shù),指定redis作為結(jié)果存儲,并將任務函數(shù)修改為兩個參數(shù),并且有返回值。

2. 調(diào)用任務/查看任務執(zhí)行結(jié)果

下面再來執(zhí)行調(diào)用一下這個任務看看。

$ python manage.py shell
>>> from blog.tasks import my_task
>>> res=my_task.delay(10,40)
>>> res.result
50
>>> res.failed()
False

再來看看worker的執(zhí)行情況,如下:

可以看到celery任務已經(jīng)執(zhí)行成功了。

但是這只是一個開始,下一步要看看如何添加定時的任務。

四、優(yōu)化Celery目錄結(jié)構(gòu)

上面直接將Celery的應用創(chuàng)建、配置、tasks任務全部寫在了一個文件,這樣在后面項目越來越大,也是不方便的。下面來拆分一下,并且添加一些常用的參數(shù)。

基本結(jié)構(gòu)如下

$ vim typeidea/celery.py (Celery應用文件)

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
 
"""
#File: celery.py
#Time: 2022/3/30 12:25 下午
#Author: julius
"""
import os
from celery import Celery
from blog import celeryconfig
project_name='typeidea'
# set the default django setting module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings')
app = Celery(project_name)
 
app.config_from_object('django.conf:settings')
 
app.autodiscover_tasks()

vim blog/celeryconfig.py (配置Celery的參數(shù)文件)

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
 
"""
#File: celeryconfig.py
#Time: 2022/3/30 2:54 下午
#Author: julius
"""

# 設置結(jié)果存儲
from typeidea import settings
import os
 
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 設置代理人broker
BROKER_URL = 'redis://127.0.0.1:6379/1'
# celery 的啟動工作數(shù)量設置
CELERY_WORKER_CONCURRENCY = 20
# 任務預取功能,就是每個工作的進程/線程在獲取任務的時候,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執(zhí)行多少個任務后進行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果網(wǎng)絡資源有限,不建議開足馬力。
CELERY_DISABLE_RATE_LIMITS = True
 
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

vim blog/tasks.py (tasks 任務文件)

import time
from blog.celery import app
 
# 創(chuàng)建任務函數(shù)
@app.task
def my_task(a, b, c):
    print('任務正在執(zhí)行...')
    print('任務1函數(shù)休眠10s')
    time.sleep(10)
    return a + b + c

五、開始使用django-celery-beat調(diào)度器

使用 django-celery-beat 動態(tài)添加定時任務  celery 4.x 版本在 django 框架中是使用 django-celery-beat 進行動態(tài)添加定時任務的。前面雖然已經(jīng)安裝了這個庫,但是還要再說明一下。

官網(wǎng)的配置說明
https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

1. 安裝 django-celery-beat

pip3 install django-celery-beat

2.在項目的 settings 文件配置 django-celery-beat 

INSTALLED_APPS = [
    'blog',
    'django_celery_beat',
    ...
]
 
# Django設置時區(qū)
LANGUAGE_CODE = 'zh-hans'  # 使用中國語言
TIME_ZONE = 'Asia/Shanghai'  # 設置Django使用中國上海時間
# 如果USE_TZ設置為True時,Django會使用系統(tǒng)默認設置的時區(qū),此時的TIME_ZONE不管有沒有設置都不起作用
# 如果USE_TZ 設置為False,TIME_ZONE = 'Asia/Shanghai', 則使用上海的UTC時間。
USE_TZ = False

3. 創(chuàng)建 django-celery-beat 相關表

執(zhí)行Django數(shù)據(jù)庫遷移: python manage.py migrate

4. 配置Celery使用 django-celery-beat

配置 celery.py

import os
 
from celery import Celery
 
from blog import celeryconfig
 
# 為celery 設置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
# 創(chuàng)建celery app
app = Celery('blog')
# 從單獨的配置模塊中加載配置
app.config_from_object(celeryconfig)
 
# 設置app自動加載任務
app.autodiscover_tasks([
    'blog',
])

配置 celeryconfig.py

# 設置結(jié)果存儲
from typeidea import settings
import os
 
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 設置代理人broker
BROKER_URL = 'redis://127.0.0.1:6379/1'
# celery 的啟動工作數(shù)量設置
CELERY_WORKER_CONCURRENCY = 20
# 任務預取功能,就是每個工作的進程/線程在獲取任務的時候,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執(zhí)行多少個任務后進行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果網(wǎng)絡資源有限,不建議開足馬力。
CELERY_DISABLE_RATE_LIMITS = True
 
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
 

編寫任務 tasks.py

import time
from celery import Celery
from blog.celery import app
 
# 使用redis做為broker
# app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1')
 
# 創(chuàng)建任務函數(shù)
@app.task
def my_task(a, b, c):
    print('任務正在執(zhí)行...')
    print('任務1函數(shù)休眠10s')
    time.sleep(10)
    return a + b + c
 
@app.task
def my_task2():
    print("任務2函數(shù)正在執(zhí)行....")
    print('任務2函數(shù)休眠10s')
    time.sleep(10)

5. 啟動定時任務work

啟動定時任務首先需要有一個work執(zhí)行異步任務,然后再啟動一個定時器觸發(fā)任務。

啟動任務 work

$ celery -A blog worker -l info 

啟動定時器觸發(fā) beat

celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

六、具體操作演練

6.1 創(chuàng)建基于間隔時間的周期性任務

1. 初始化周期間隔對象interval 對象

>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
>>> schedule, created = IntervalSchedule.objects.get_or_create( 
...       every=10, 
...       period=IntervalSchedule.SECONDS, 
...  )
>>> IntervalSchedule.objects.all()
<QuerySet [<IntervalSchedule: every 10 seconds>]>

2.創(chuàng)建一個無參數(shù)的周期性間隔任務

>>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',)
<PeriodicTask: my_task2: every 10 seconds>

beat 調(diào)度服務日志顯示如下:

 worker 服務日志顯示如下:

3.創(chuàng)建一個帶參數(shù)的周期性間隔任務

>>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30]))
<PeriodicTask: my_task: every 10 seconds>

beat 調(diào)度服務日志結(jié)果:

 worker 服務日志結(jié)果:

4.如何高并發(fā)執(zhí)行任務

需要并行執(zhí)行任務的時候,就需要設置多個worker來執(zhí)行任務。 

6.2 創(chuàng)建一個不帶參數(shù)的周期性間隔任務

1.初始化 crontab 的調(diào)度對象

>>> import pytz
>>> schedule, _ = CrontabSchedule.objects.get_or_create(
... minute='*',
... hour='*',
... day_of_week='*',
... day_of_month='*',
... timezone=pytz.timezone('Asia/Shanghai')
... )

2. 創(chuàng)建不帶參數(shù)的定時任務

PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)

beat 調(diào)度服務執(zhí)行結(jié)果 

 worker 執(zhí)行服務結(jié)果

6.3 周期性任務的查詢、刪除操作

1. 周期性任務的查詢

>>> PeriodicTask.objects.all()
<ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
>>> PeriodicTask.objects.get(name='my_task2_crontab')
<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
>>> for task in PeriodicTask.objects.all():
...     print(task.id)
... 
1
13
>>> PeriodicTask.objects.get(id=13)
<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
>>> PeriodicTask.objects.get(name='my_task2_crontab')
<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

 控制臺實際操作記錄

2.周期性任務的暫停/啟動

2.1 設置my_taks2_crontab 暫停任務

>>> my_task2_crontab = PeriodicTask.objects.get(id=13)
>>> my_task2_crontab.enabled
True
>>> my_task2_crontab.enabled=False
>>> my_task2_crontab.save()

查看worker輸出:

 可以看到worker從19:31以后已經(jīng)沒有輸出了,說明已經(jīng)成功吧my_task2_crontab 任務暫停

2.2 設置my_task2_crontab 開啟任務

把任務的 enabled 為 True 即可:

>>> my_task2_crontab.enabled
False
>>> my_task2_crontab.enabled=True
>>> my_task2_crontab.save()

查看worker輸出:

 可以看到worker從19:36開始有輸出,說明已把my_task2_crontab 任務重新啟動

3. 周期性任務的刪除

獲取到指定的任務后調(diào)用delete(),再次查詢指定任務會發(fā)現(xiàn)已經(jīng)不存在了

PeriodicTask.objects.get(name='my_task2_crontab').delete()
>>> PeriodicTask.objects.get(name='my_task2_crontab')
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
    raise self.model.DoesNotExist(
django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

總結(jié)

到此這篇關于Python Celery動態(tài)添加定時任務生產(chǎn)實踐的文章就介紹到這了,更多相關Celery動態(tài)添加定時任務內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Python第三方庫undetected_chromedriver的使用

    Python第三方庫undetected_chromedriver的使用

    這篇文章主要給大家介紹了關于Python第三方庫undetected_chromedriver的使用方法,文中通過實例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2023-01-01
  • python?使用第三方庫requests-toolbelt?上傳文件流的示例

    python?使用第三方庫requests-toolbelt?上傳文件流的示例

    這篇文章主要介紹了python?使用第三方庫requests-toolbelt?上傳文件流,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-09-09
  • Python?實現(xiàn)驅(qū)動AI機器人

    Python?實現(xiàn)驅(qū)動AI機器人

    這篇文章主要介紹了Python?實現(xiàn)驅(qū)動AI機器人,下文圍繞利用Python?實現(xiàn)驅(qū)動AI機器人的相關資料展開內(nèi)容,需要的小伙伴可以參考一下
    2022-02-02
  • python處理二進制數(shù)據(jù)的方法

    python處理二進制數(shù)據(jù)的方法

    這篇文章主要介紹了python處理二進制數(shù)據(jù)的方法,涉及Python針對二進制數(shù)據(jù)的相關操作技巧,需要的朋友可以參考下
    2015-06-06
  • 基于pyinstaller超級加密操作(加殼和轉(zhuǎn)c)

    基于pyinstaller超級加密操作(加殼和轉(zhuǎn)c)

    這篇文章主要介紹了基于pyinstaller超級加密操作 (加殼和轉(zhuǎn)c),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-03-03
  • python中的標準庫html

    python中的標準庫html

    html庫是用于解析HTML的一個工具,是python自帶的標準庫之一,今天通過本文給大家介紹下python中的標準庫html,感興趣的朋友一起看看吧
    2022-04-04
  • Python 分布式緩存之Reids數(shù)據(jù)類型操作詳解

    Python 分布式緩存之Reids數(shù)據(jù)類型操作詳解

    這篇文章主要介紹了Python 分布式緩存之Reids數(shù)據(jù)類型操作詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-06-06
  • Python?Matplotlib繪制動圖平滑曲線

    Python?Matplotlib繪制動圖平滑曲線

    這篇文章主要介紹了Python?Matplotlib繪制動圖平滑曲線,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考一下,需要的小伙伴可以參考一下
    2022-08-08
  • python爬取某網(wǎng)站原圖作為壁紙

    python爬取某網(wǎng)站原圖作為壁紙

    之前已經(jīng)爬取過網(wǎng)站上的圖片,貌似很簡單可是他喵的都像馬賽克一樣,怎么能用做壁紙呢通過多重審查發(fā)現(xiàn),原圖地址藏在更深的地方 所以,來爬一下原圖吧,需要的朋友可以參考下
    2021-06-06
  • python輸出100以內(nèi)的質(zhì)數(shù)與合數(shù)實例代碼

    python輸出100以內(nèi)的質(zhì)數(shù)與合數(shù)實例代碼

    本文通過實例代碼給大家介紹了python輸出100以內(nèi)的質(zhì)數(shù)與合數(shù)的方法,非常不錯,具有一定的參考借鑒價值,需要的朋友參考下吧
    2018-07-07

最新評論