Django集成Celery之狀態(tài)監(jiān)控與任務(wù)管理詳解
如何通過 Django 來管理 Celery 任務(wù)?通過 Django Admin 界面提供任務(wù)的查詢、查看、重試、終止等功能?下面是一個(gè)完整的步驟指南。
使用 Django 管理 Celery Worker
安裝 Django 和相關(guān)包
首先,創(chuàng)建一個(gè)新的虛擬環(huán)境并安裝所需的包。
python -m venv myenv source myenv/bin/activate # Windows 系統(tǒng)使用: myenv\Scripts\activate pip install django django-celery-results django-celery-beat celery
創(chuàng)建 Django 項(xiàng)目和應(yīng)用
django-admin startproject myproject cd myproject django-admin startapp myapp
配置 Django 和 Celery
在 myproject/settings.py
文件中添加以下內(nèi)容:
INSTALLED_APPS = [ ..., 'django_celery_results', 'django_celery_beat', 'myapp', # 確保 app 在這個(gè)列表里 ] CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用 Redis 作為示例,可以根據(jù)需求更改 CELERY_RESULT_BACKEND = 'django-db' CELERY_CACHE_BACKEND = 'django-cache' CELERY_TRACK_STARTED = True CELERY_SEND_EVENTS = True # 確保已經(jīng)配置了數(shù)據(jù)庫 DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': BASE_DIR / 'db.sqlite3', } } # 配置 Django 緩存(可選) CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', } }
在 myproject
目錄中創(chuàng)建一個(gè) celery.py
文件:
from __future__ import absolute_import, unicode_literals import os from celery import Celery from django.conf import settings # 設(shè)置 Django 的配置模塊 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') app = Celery('myproject') # 從 Django 的設(shè)置中配置 Celery app.config_from_object('django.conf:settings', namespace='CELERY') # 自動(dòng)發(fā)現(xiàn)任務(wù) # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
修改 myproject/__init__.py
文件,使得 Django 在啟動(dòng)時(shí)加載 Celery:
from __future__ import absolute_import, unicode_literals # 這將確保當(dāng) Django 啟動(dòng)時(shí)加載 app.py from .celery import app as celery_app __all__ = ('celery_app',)
創(chuàng)建一個(gè) Celery 任務(wù)
在 myapp/tasks.py
中創(chuàng)建一個(gè)簡(jiǎn)單的 Celery 任務(wù):
from celery import shared_task @shared_task def add(x, y): return x + y
注冊(cè)自定義的 TaskResultAdmin
我們需要在自定義 TaskResultAdmin
之前先取消已經(jīng)注冊(cè)的模型。
在 myapp/admin.py
中做如下修改:
from django.contrib import admin from django_celery_results.models import TaskResult from django_celery_results.admin import TaskResultAdmin as DefaultTaskResultAdmin from django.urls import path from django.shortcuts import redirect from celery.result import AsyncResult from myproject.celery import app # 取消已經(jīng)注冊(cè)的 TaskResult admin.site.unregister(TaskResult) # 創(chuàng)建一個(gè)自定義的 TaskResultAdmin 繼承自默認(rèn)的 TaskResultAdmin class CustomTaskResultAdmin(DefaultTaskResultAdmin): change_list_template = "admin/celery_task_changelist.html" def get_urls(self): urls = super().get_urls() custom_urls = [ path('retry/<task_id>/', self.admin_site.admin_view(self.retry_task), name='retry-task'), path('terminate/<task_id>/', self.admin_site.admin_view(self.terminate_task), name='terminate-task'), ] return custom_urls + urls def retry_task(self, request, task_id, *args, **kwargs): AsyncResult(task_id, app=app).reapply() self.message_user(request, f'Task {task_id} retried successfully.') return redirect('..') def terminate_task(self, request, task_id, *args, **kwargs): AsyncResult(task_id, app=app).revoke(terminate=True) self.message_user(request, f'Task {task_id} terminated successfully.') return redirect('..') # 注冊(cè)自定義的 TaskResultAdmin admin.site.register(TaskResult, CustomTaskResultAdmin)
TaskResult
模型已經(jīng)被 django_celery_results
自動(dòng)注冊(cè)到 Django Admin 中了。
我們可以通過繼承 django_celery_results
的 TaskResultAdmin
并覆蓋的方式來避免重復(fù)注冊(cè)模型。
創(chuàng)建 Django Admin 界面的自定義模板
在 Django 項(xiàng)目中創(chuàng)建以下目錄結(jié)構(gòu) templates/admin
并在 admin
文件夾內(nèi)創(chuàng)建 celery_task_changelist.html
:
{% extends "admin/change_list.html" %} {% block result_list %} {{ block.super }} <script> function handleTask(action, task_id) { fetch(`/${action}/${task_id}/`, { method: 'POST', headers: { 'X-CSRFToken': document.querySelector('[name=csrfmiddlewaretoken]') .value, }, }).then((response) => { if (response.ok) { location.reload(); } else { alert('Action failed.'); } }); } </script> <div> <form method="post"> {% csrf_token %} {% for result in cl.result_list %} <button type="button" onclick="handleTask('retry', '{{ result.task_id }}')"> Retry </button> <button type="button" onclick="handleTask('terminate', '{{ result.task_id }}')" > Terminate </button> {% endfor %} </form> </div> {% endblock %}
確保自定義的模板路徑正確。對(duì)于默認(rèn) Django 項(xiàng)目模板目錄,模板文件夾應(yīng)該在 myproject/templates/admin/celery_task_changelist.html
。
運(yùn)行 Django 和 Celery
- 應(yīng)用數(shù)據(jù)庫遷移:
python manage.py migrate
- 啟動(dòng) Django 服務(wù)器:
python manage.py runserver
- 啟動(dòng) Celery worker:
celery -A myproject worker -l info
使用 Django Admin 管理 Celery 任務(wù)
打開瀏覽器并訪問 http://127.0.0.1:8000/admin/
,Celery 任務(wù)將會(huì)在 Django admin 界面中顯示,并且可以通過點(diǎn)擊按鈕來進(jìn)行查詢、查看、重試和終止等操作。
這樣就完成了通過 Django Admin 界面管理 Celery 任務(wù)的完整步驟。如有需要可以進(jìn)一步定制和優(yōu)化界面和功能。
啟動(dòng) Django 本地的 Celery Worker
為了在啟動(dòng) Celery Worker 后向 Worker 發(fā)起任務(wù),并在 Django Admin 界面演示查詢、查看、重試和終止任務(wù),可以按以下步驟進(jìn)行操作:
創(chuàng)建 Celery 任務(wù)
在 myapp/tasks.py
中定義一些示例任務(wù):
# myapp/tasks.py from celery import shared_task import time @shared_task def add(x, y): time.sleep(10) # 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù) return x + y @shared_task def long_running_task(duration): time.sleep(duration) return f"Task completed after {duration} seconds"# myapp/tasks.py from celery import shared_task import time @shared_task def add(x, y): time.sleep(10) # 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù) return x + y @shared_task def long_running_task(duration): time.sleep(duration) return f"Task completed after {duration} seconds"
創(chuàng)建觸發(fā)任務(wù)的視圖
為了便于演示,可以創(chuàng)建一些視圖來觸發(fā)這些任務(wù)。更新 urls.py
和 views.py
文件。
- 在
myapp/views.py
中:
# myapp/views.py from django.http import JsonResponse from myapp.tasks import add, long_running_task def trigger_add_task(request): add.delay(3, 4) return JsonResponse({'status': 'Task add (3, 4) triggered'}) def trigger_long_running_task(request): long_running_task.delay(30) # 任務(wù)運(yùn)行30秒 return JsonResponse({'status': 'Long running task for 30 seconds triggered'})
- 在
myapp/urls.py
中:
# myapp/urls.py from django.urls import path from .views import trigger_add_task, trigger_long_running_task urlpatterns = [ path('trigger-add-task/', trigger_add_task, name='trigger-add-task'), path('trigger-long-task/', trigger_long_running_task, name='trigger-long-task'), ]
- 在
myproject/urls.py
中:
# myproject/urls.py from django.contrib import admin from django.urls import path, include urlpatterns = [ path('admin/', admin.site.urls), path('tasks/', include('myapp.urls')), ]
更新 Celery 配置
確保 settings.py
中配置了 Celery:
# myproject/settings.py CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用 Redis 作為示例,可以根據(jù)需求更改 CELERY_RESULT_BACKEND = 'django-db'
啟動(dòng) Celery Worker 和 Django 服務(wù)器
確保已經(jīng)啟動(dòng)了 Redis 服務(wù):
redis-server
然后分別啟動(dòng) Django 服務(wù)器和 Celery Worker:
# 啟動(dòng) Django 服務(wù)器 python manage.py runserver # 啟動(dòng) Celery Worker celery -A myproject worker -l info
觸發(fā)任務(wù)并在 Django Admin 界面中查看
打開瀏覽器并訪問以下 URL 以觸發(fā)任務(wù):
http://127.0.0.1:8000/tasks/trigger-add-task/
- 觸發(fā)增加任務(wù)http://127.0.0.1:8000/tasks/trigger-long-task/
- 觸發(fā)長(zhǎng)時(shí)間運(yùn)行任務(wù)
通過這些 URL 觸發(fā) Celery 任務(wù)。然后可以通過 Django Admin 界面進(jìn)行查詢、查看、重試和終止這些任務(wù)。
在 Django Admin 界面查看任務(wù)狀態(tài)
打開瀏覽器并訪問 http://127.0.0.1:8000/admin/
,登陸 Django Admin 界面,導(dǎo)航到 Task Results
部分。應(yīng)該能看到適當(dāng)?shù)娜蝿?wù)列表,并通過之前在自定義 TaskResultAdmin
中定義的操作進(jìn)行重試和終止任務(wù)。
這些步驟能夠通過 Django 和 Celery 演示觸發(fā)任務(wù)并在 Django Admin 界面中進(jìn)行查詢、查看、重試、終止等操作。
啟動(dòng)遠(yuǎn)程的 Celery Worker
要通過 Django Admin 管理和監(jiān)控在遠(yuǎn)程服務(wù)器上單獨(dú)運(yùn)行且由獨(dú)立代碼倉庫維護(hù)的 Celery Worker,需要配置和協(xié)調(diào)多個(gè)獨(dú)立的系統(tǒng)。
安裝 Celery Worker
在遠(yuǎn)程服務(wù)器上,創(chuàng)建一個(gè)獨(dú)立的項(xiàng)目(假設(shè)名字為 worker_project
),并安裝所需的依賴:
# 在遠(yuǎn)程服務(wù)器上 python -m venv venv source venv/bin/activate pip install celery redis
配置 Celery Worker
- 在
worker_project
內(nèi)部配置 Celery(worker_project/celery.py
):
from __future__ import absolute_import, unicode_literals import os from celery import Celery # 設(shè)置 Django 的配置模塊 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'worker_project.settings') app = Celery('worker_project') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
- 在
worker_project/settings.py
中配置 Celery:
CELERY_BROKER_URL = 'redis://your_redis_server:6379/0' # 替換為實(shí)際的 Redis 地址 CELERY_RESULT_BACKEND = 'redis://your_redis_server:6379/0'
定義任務(wù)
創(chuàng)建一些測(cè)試任務(wù)(worker_project/tasks.py
):
from celery import shared_task import time @shared_task def add(x, y): time.sleep(10) # 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù) return x + y @shared_task def long_running_task(duration): time.sleep(duration) return f"Task completed after {duration} seconds"
啟動(dòng) Celery Worker
celery -A worker_project worker -l info
啟動(dòng)和測(cè)試
啟動(dòng)本地 Django 服務(wù)器:
python manage.py runserver
確保遠(yuǎn)程服務(wù)器上的 Celery Worker 已經(jīng)在運(yùn)行。
觸發(fā)任務(wù)并在 Django Admin 中查看:
- 訪問
http://127.0.0.1:8000/tasks/trigger-add-task/
- 觸發(fā)增加任務(wù) - 訪問
http://127.0.0.1:8000/tasks/trigger-long-task/
- 觸發(fā)長(zhǎng)時(shí)間運(yùn)行任務(wù)
通過 http://127.0.0.1:8000/admin/
登錄 Django Admin 界面,導(dǎo)航到 Task Results
部分,您應(yīng)該能看到這些任務(wù)并管理它們(如重試和終止)。
以上配置實(shí)現(xiàn)了在本地的 Django 項(xiàng)目中通過 Django Admin 管理和監(jiān)控在遠(yuǎn)程服務(wù)器上單獨(dú)運(yùn)行的 Celery Worker,并通過 Redis 進(jìn)行通信。這種架構(gòu)可以在實(shí)際生產(chǎn)環(huán)境中更好地分離職責(zé)并提高系統(tǒng)的健壯性和擴(kuò)展性。
使用 Django Admin 管理 Flask 啟動(dòng)的 Celery Worker 的常見問題
在使用 Flask App 啟動(dòng)遠(yuǎn)程 Celery Worker,并在 Django Admin 對(duì)這些 Worker 進(jìn)行監(jiān)控和管理時(shí),可能會(huì)遇到諸如 Django Admin 界面沒有顯示 Celery Worker 任務(wù)和任務(wù)執(zhí)行結(jié)果的問題,可能有以下幾個(gè)原因:
- 結(jié)果后端配置錯(cuò)誤:確保 Flask 和 Django 使用相同的結(jié)果后端(result backend)。
- Django 配置錯(cuò)誤:確保 Django 已正確配置 Celery 結(jié)果后端。
- Flask 應(yīng)用沒有保存結(jié)果:確保 Flask 的 Celery 配置沒有禁用結(jié)果保存功能。
要修復(fù)這個(gè)問題,請(qǐng)按以下步驟檢查和修正設(shè)置:
檢查 Flask 應(yīng)用的 Celery 配置
確保 Flask 應(yīng)用中的 Celery 配置了正確的結(jié)果后端,并且沒有禁用任務(wù)結(jié)果的存儲(chǔ)。例如:
celery_app = Celery( configs.celery.name, task_cls=FlaskTask, broker=app.config["CELERY_BROKER_URL"], backend=app.config["CELERY_BACKEND"], # 確保配置了結(jié)果后端 task_ignore_result=False, # 確保不忽略任務(wù)結(jié)果 ) celery_app.conf.update( result_backend=app.config["CELERY_RESULT_BACKEND"], # 確保配置了結(jié)果后端 broker_connection_retry_on_startup=True, ) # 確保沒有不必要的配置禁用結(jié)果存儲(chǔ)
檢查 Django 的 Celery 配置
在 settings.py
中,確保定義了 Celery 結(jié)果后端,并且配置與 Flask 中的配置一致:
# Celery 配置 CELERY_BROKER_URL = 'redis://localhost:6379/0' # 替換為實(shí)際的 Broker URL CELERY_RESULT_BACKEND = 'django-db' # 使用 Django 數(shù)據(jù)庫作為結(jié)果后端 CELERY_CACHE_BACKEND = 'django-cache' CELERY_RESULT_PERSISTENT = True # 安裝的應(yīng)用程序 INSTALLED_APPS = [ # 其他應(yīng)用 'django_celery_results', 'django_celery_beat', ] # 其他配置
同步數(shù)據(jù)庫
確保 Django 數(shù)據(jù)庫與 Celery 結(jié)果模型一致:
python manage.py migrate django_celery_results python manage.py migrate django_celery_beat
確保 Django Admin 中注冊(cè)了相關(guān)模型
確保在 admin.py
中注冊(cè)了 django_celery_results
和 django_celery_beat
的模型,以便在 Admin 界面中查看:
from django.contrib import admin from django_celery_results.models import TaskResult from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule admin.site.register(TaskResult) admin.site.register(PeriodicTask) admin.site.register(IntervalSchedule) admin.site.register(CrontabSchedule)
測(cè)試 Celery 任務(wù)
確保從 Flask 發(fā)送的 Celery 任務(wù)能正確存儲(chǔ)結(jié)果:
@celery_app.task(bind=True) def debug_task(self, *args, **kwargs): print(f'Request: {self.request!r}') return 'Test Result'
在 Flask 應(yīng)用中調(diào)用這個(gè)任務(wù):
debug_task.delay()
然后檢查 Django Admin 界面中的任務(wù)結(jié)果是否顯示。
檢查 Celery Worker 配置
確保 celery worker
是在一個(gè)共享的 Broker 和 Backend 上運(yùn)行:
celery -A your_flask_app_name worker --loglevel=info
通過這些步驟,應(yīng)該能確保在 Django Admin 界面中正確顯示 Flask 應(yīng)用中 Celery Worker 發(fā)起的任務(wù)和任務(wù)執(zhí)行結(jié)果。
如果問題仍然存在,檢查日志和配置是否有任何錯(cuò)誤,并確保 Flask 和 Django 的所有 Celery 配置和數(shù)據(jù)庫訪問是有效且一致的。
總結(jié)
通過 Django Admin 管理 Celery Worker 任務(wù)是一種方便的方式,可以通過簡(jiǎn)單的配置和定制來實(shí)現(xiàn)任務(wù)的查詢、查看、重試和終止等操作。
通過本文提供的步驟和示例,您可以輕松地在 Django 項(xiàng)目中集成 Celery Worker,并通過 Django Admin 界面對(duì)任務(wù)進(jìn)行管理和監(jiān)控。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python異常對(duì)代碼運(yùn)行性能的影響實(shí)例解析
這篇文章主要介紹了Python異常對(duì)代碼運(yùn)行性能的影響實(shí)例解析,分享了相關(guān)代碼示例,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-02-02訊飛webapi語音識(shí)別接口調(diào)用示例代碼(python)
這篇文章主要介紹了如何使用Python3調(diào)用訊飛WebAPI語音識(shí)別接口,重點(diǎn)解決了在處理語音識(shí)別結(jié)果時(shí)判斷是否為最后一幀的問題,通過運(yùn)行代碼并總結(jié)經(jīng)驗(yàn),解決了常見的模塊和屬性錯(cuò)誤,需要的朋友可以參考下2025-03-03python 實(shí)現(xiàn)提取某個(gè)索引中某個(gè)時(shí)間段的數(shù)據(jù)方法
今天小編就為大家分享一篇python 實(shí)現(xiàn)提取某個(gè)索引中某個(gè)時(shí)間段的數(shù)據(jù)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-02-02python爬蟲入門教程--快速理解HTTP協(xié)議(一)
http協(xié)議是互聯(lián)網(wǎng)里面最重要,最基礎(chǔ)的協(xié)議之一,我們的爬蟲需要經(jīng)常和http協(xié)議打交道。下面這篇文章主要給大家介紹了關(guān)于python爬蟲入門之快速理解HTTP協(xié)議的相關(guān)資料,文中介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。2017-05-05python爬蟲MeterSphere平臺(tái)執(zhí)行報(bào)告使用進(jìn)階
這篇文章主要為大家介紹了python爬蟲MeterSphere平臺(tái)執(zhí)行報(bào)告使用進(jìn)階示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12初步認(rèn)識(shí)Python中的列表與位運(yùn)算符
這篇文章主要介紹了Python中的列表與位運(yùn)算符,是Python入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-10-10關(guān)于Python中進(jìn)度條的六個(gè)實(shí)用技巧分享
在項(xiàng)目開發(fā)過程中加載、啟動(dòng)、下載項(xiàng)目難免會(huì)用到進(jìn)度條,下面這篇文章主要給大家介紹了關(guān)于Python中進(jìn)度條的六個(gè)實(shí)用技巧,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04