欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Django集成Celery之狀態(tài)監(jiān)控與任務(wù)管理詳解

 更新時(shí)間:2025年03月18日 09:13:37   作者:三余知行  
這篇文章主要介紹了Django集成Celery之狀態(tài)監(jiān)控與任務(wù)管理詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

如何通過 Django 來管理 Celery 任務(wù)?通過 Django Admin 界面提供任務(wù)的查詢、查看、重試、終止等功能?下面是一個(gè)完整的步驟指南。

使用 Django 管理 Celery Worker

安裝 Django 和相關(guān)包

首先,創(chuàng)建一個(gè)新的虛擬環(huán)境并安裝所需的包。

python -m venv myenv
source myenv/bin/activate  # Windows 系統(tǒng)使用: myenv\Scripts\activate
pip install django django-celery-results django-celery-beat celery

創(chuàng)建 Django 項(xiàng)目和應(yīng)用

django-admin startproject myproject
cd myproject
django-admin startapp myapp

配置 Django 和 Celery

myproject/settings.py 文件中添加以下內(nèi)容:

INSTALLED_APPS = [
    ...,
    'django_celery_results',
    'django_celery_beat',
    'myapp',  # 確保 app 在這個(gè)列表里
]

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用 Redis 作為示例,可以根據(jù)需求更改
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'

CELERY_TRACK_STARTED = True
CELERY_SEND_EVENTS = True

# 確保已經(jīng)配置了數(shù)據(jù)庫
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': BASE_DIR / 'db.sqlite3',
    }
}

# 配置 Django 緩存(可選)
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
    }
}

myproject 目錄中創(chuàng)建一個(gè) celery.py 文件:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# 設(shè)置 Django 的配置模塊
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 從 Django 的設(shè)置中配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自動(dòng)發(fā)現(xiàn)任務(wù)
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

修改 myproject/__init__.py 文件,使得 Django 在啟動(dòng)時(shí)加載 Celery:

from __future__ import absolute_import, unicode_literals

# 這將確保當(dāng) Django 啟動(dòng)時(shí)加載 app.py
from .celery import app as celery_app

__all__ = ('celery_app',)

創(chuàng)建一個(gè) Celery 任務(wù)

myapp/tasks.py 中創(chuàng)建一個(gè)簡(jiǎn)單的 Celery 任務(wù):

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

注冊(cè)自定義的 TaskResultAdmin

我們需要在自定義 TaskResultAdmin 之前先取消已經(jīng)注冊(cè)的模型。

myapp/admin.py 中做如下修改:

from django.contrib import admin
from django_celery_results.models import TaskResult
from django_celery_results.admin import TaskResultAdmin as DefaultTaskResultAdmin
from django.urls import path
from django.shortcuts import redirect
from celery.result import AsyncResult
from myproject.celery import app

# 取消已經(jīng)注冊(cè)的 TaskResult
admin.site.unregister(TaskResult)

# 創(chuàng)建一個(gè)自定義的 TaskResultAdmin 繼承自默認(rèn)的 TaskResultAdmin
class CustomTaskResultAdmin(DefaultTaskResultAdmin):
    change_list_template = "admin/celery_task_changelist.html"

    def get_urls(self):
        urls = super().get_urls()
        custom_urls = [
            path('retry/<task_id>/', self.admin_site.admin_view(self.retry_task), name='retry-task'),
            path('terminate/<task_id>/', self.admin_site.admin_view(self.terminate_task), name='terminate-task'),
        ]
        return custom_urls + urls

    def retry_task(self, request, task_id, *args, **kwargs):
        AsyncResult(task_id, app=app).reapply()
        self.message_user(request, f'Task {task_id} retried successfully.')
        return redirect('..')

    def terminate_task(self, request, task_id, *args, **kwargs):
        AsyncResult(task_id, app=app).revoke(terminate=True)
        self.message_user(request, f'Task {task_id} terminated successfully.')
        return redirect('..')

# 注冊(cè)自定義的 TaskResultAdmin
admin.site.register(TaskResult, CustomTaskResultAdmin)

TaskResult 模型已經(jīng)被 django_celery_results 自動(dòng)注冊(cè)到 Django Admin 中了。

我們可以通過繼承 django_celery_resultsTaskResultAdmin 并覆蓋的方式來避免重復(fù)注冊(cè)模型。

創(chuàng)建 Django Admin 界面的自定義模板

在 Django 項(xiàng)目中創(chuàng)建以下目錄結(jié)構(gòu) templates/admin 并在 admin 文件夾內(nèi)創(chuàng)建 celery_task_changelist.html

{% extends "admin/change_list.html" %} {% block result_list %} {{ block.super }}
<script>
  function handleTask(action, task_id) {
    fetch(`/${action}/${task_id}/`, {
      method: 'POST',
      headers: {
        'X-CSRFToken': document.querySelector('[name=csrfmiddlewaretoken]')
          .value,
      },
    }).then((response) => {
      if (response.ok) {
        location.reload();
      } else {
        alert('Action failed.');
      }
    });
  }
</script>
<div>
  <form method="post">
    {% csrf_token %} {% for result in cl.result_list %}
    <button type="button" onclick="handleTask('retry', '{{ result.task_id }}')">
      Retry
    </button>
    <button
      type="button"
      onclick="handleTask('terminate', '{{ result.task_id }}')"
    >
      Terminate
    </button>
    {% endfor %}
  </form>
</div>
{% endblock %}

確保自定義的模板路徑正確。對(duì)于默認(rèn) Django 項(xiàng)目模板目錄,模板文件夾應(yīng)該在 myproject/templates/admin/celery_task_changelist.html。

運(yùn)行 Django 和 Celery

  • 應(yīng)用數(shù)據(jù)庫遷移:
python manage.py migrate
  • 啟動(dòng) Django 服務(wù)器:
python manage.py runserver
  • 啟動(dòng) Celery worker:
celery -A myproject worker -l info

使用 Django Admin 管理 Celery 任務(wù)

打開瀏覽器并訪問 http://127.0.0.1:8000/admin/,Celery 任務(wù)將會(huì)在 Django admin 界面中顯示,并且可以通過點(diǎn)擊按鈕來進(jìn)行查詢、查看、重試和終止等操作。

這樣就完成了通過 Django Admin 界面管理 Celery 任務(wù)的完整步驟。如有需要可以進(jìn)一步定制和優(yōu)化界面和功能。

啟動(dòng) Django 本地的 Celery Worker

為了在啟動(dòng) Celery Worker 后向 Worker 發(fā)起任務(wù),并在 Django Admin 界面演示查詢、查看、重試和終止任務(wù),可以按以下步驟進(jìn)行操作:

創(chuàng)建 Celery 任務(wù)

myapp/tasks.py 中定義一些示例任務(wù):

# myapp/tasks.py
from celery import shared_task
import time

@shared_task
def add(x, y):
    time.sleep(10)  # 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù)
    return x + y

@shared_task
def long_running_task(duration):
    time.sleep(duration)
    return f"Task completed after {duration} seconds"# myapp/tasks.py
from celery import shared_task
import time

@shared_task
def add(x, y):
    time.sleep(10)  # 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù)
    return x + y

@shared_task
def long_running_task(duration):
    time.sleep(duration)
    return f"Task completed after {duration} seconds"

創(chuàng)建觸發(fā)任務(wù)的視圖

為了便于演示,可以創(chuàng)建一些視圖來觸發(fā)這些任務(wù)。更新 urls.pyviews.py 文件。

  • myapp/views.py 中:
# myapp/views.py
from django.http import JsonResponse
from myapp.tasks import add, long_running_task

def trigger_add_task(request):
    add.delay(3, 4)
    return JsonResponse({'status': 'Task add (3, 4) triggered'})

def trigger_long_running_task(request):
    long_running_task.delay(30)  # 任務(wù)運(yùn)行30秒
    return JsonResponse({'status': 'Long running task for 30 seconds triggered'})
  • myapp/urls.py 中:
# myapp/urls.py
from django.urls import path
from .views import trigger_add_task, trigger_long_running_task

urlpatterns = [
    path('trigger-add-task/', trigger_add_task, name='trigger-add-task'),
    path('trigger-long-task/', trigger_long_running_task, name='trigger-long-task'),
]
  • myproject/urls.py 中:
# myproject/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('tasks/', include('myapp.urls')),
]

更新 Celery 配置

確保 settings.py 中配置了 Celery:

# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用 Redis 作為示例,可以根據(jù)需求更改
CELERY_RESULT_BACKEND = 'django-db'

啟動(dòng) Celery Worker 和 Django 服務(wù)器

確保已經(jīng)啟動(dòng)了 Redis 服務(wù):

redis-server

然后分別啟動(dòng) Django 服務(wù)器和 Celery Worker:

# 啟動(dòng) Django 服務(wù)器
python manage.py runserver

# 啟動(dòng) Celery Worker
celery -A myproject worker -l info

觸發(fā)任務(wù)并在 Django Admin 界面中查看

打開瀏覽器并訪問以下 URL 以觸發(fā)任務(wù):

  • http://127.0.0.1:8000/tasks/trigger-add-task/ - 觸發(fā)增加任務(wù)
  • http://127.0.0.1:8000/tasks/trigger-long-task/ - 觸發(fā)長(zhǎng)時(shí)間運(yùn)行任務(wù)

通過這些 URL 觸發(fā) Celery 任務(wù)。然后可以通過 Django Admin 界面進(jìn)行查詢、查看、重試和終止這些任務(wù)。

在 Django Admin 界面查看任務(wù)狀態(tài)

打開瀏覽器并訪問 http://127.0.0.1:8000/admin/,登陸 Django Admin 界面,導(dǎo)航到 Task Results 部分。應(yīng)該能看到適當(dāng)?shù)娜蝿?wù)列表,并通過之前在自定義 TaskResultAdmin 中定義的操作進(jìn)行重試和終止任務(wù)。

這些步驟能夠通過 Django 和 Celery 演示觸發(fā)任務(wù)并在 Django Admin 界面中進(jìn)行查詢、查看、重試、終止等操作。

啟動(dòng)遠(yuǎn)程的 Celery Worker

要通過 Django Admin 管理和監(jiān)控在遠(yuǎn)程服務(wù)器上單獨(dú)運(yùn)行且由獨(dú)立代碼倉庫維護(hù)的 Celery Worker,需要配置和協(xié)調(diào)多個(gè)獨(dú)立的系統(tǒng)。

安裝 Celery Worker

在遠(yuǎn)程服務(wù)器上,創(chuàng)建一個(gè)獨(dú)立的項(xiàng)目(假設(shè)名字為 worker_project),并安裝所需的依賴:

# 在遠(yuǎn)程服務(wù)器上
python -m venv venv
source venv/bin/activate
pip install celery redis

配置 Celery Worker

  • worker_project 內(nèi)部配置 Celery(worker_project/celery.py):
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 設(shè)置 Django 的配置模塊
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'worker_project.settings')

app = Celery('worker_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
  • worker_project/settings.py 中配置 Celery:
CELERY_BROKER_URL = 'redis://your_redis_server:6379/0'  # 替換為實(shí)際的 Redis 地址
CELERY_RESULT_BACKEND = 'redis://your_redis_server:6379/0'

定義任務(wù)

創(chuàng)建一些測(cè)試任務(wù)(worker_project/tasks.py):

from celery import shared_task
import time

@shared_task
def add(x, y):
    time.sleep(10)  # 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù)
    return x + y

@shared_task
def long_running_task(duration):
    time.sleep(duration)
    return f"Task completed after {duration} seconds"

啟動(dòng) Celery Worker

celery -A worker_project worker -l info

啟動(dòng)和測(cè)試

啟動(dòng)本地 Django 服務(wù)器:

python manage.py runserver

確保遠(yuǎn)程服務(wù)器上的 Celery Worker 已經(jīng)在運(yùn)行。

觸發(fā)任務(wù)并在 Django Admin 中查看:

  • 訪問 http://127.0.0.1:8000/tasks/trigger-add-task/ - 觸發(fā)增加任務(wù)
  • 訪問 http://127.0.0.1:8000/tasks/trigger-long-task/ - 觸發(fā)長(zhǎng)時(shí)間運(yùn)行任務(wù)

通過 http://127.0.0.1:8000/admin/ 登錄 Django Admin 界面,導(dǎo)航到 Task Results 部分,您應(yīng)該能看到這些任務(wù)并管理它們(如重試和終止)。

以上配置實(shí)現(xiàn)了在本地的 Django 項(xiàng)目中通過 Django Admin 管理和監(jiān)控在遠(yuǎn)程服務(wù)器上單獨(dú)運(yùn)行的 Celery Worker,并通過 Redis 進(jìn)行通信。這種架構(gòu)可以在實(shí)際生產(chǎn)環(huán)境中更好地分離職責(zé)并提高系統(tǒng)的健壯性和擴(kuò)展性。

使用 Django Admin 管理 Flask 啟動(dòng)的 Celery Worker 的常見問題

在使用 Flask App 啟動(dòng)遠(yuǎn)程 Celery Worker,并在 Django Admin 對(duì)這些 Worker 進(jìn)行監(jiān)控和管理時(shí),可能會(huì)遇到諸如 Django Admin 界面沒有顯示 Celery Worker 任務(wù)和任務(wù)執(zhí)行結(jié)果的問題,可能有以下幾個(gè)原因:

  1. 結(jié)果后端配置錯(cuò)誤:確保 Flask 和 Django 使用相同的結(jié)果后端(result backend)。
  2. Django 配置錯(cuò)誤:確保 Django 已正確配置 Celery 結(jié)果后端。
  3. Flask 應(yīng)用沒有保存結(jié)果:確保 Flask 的 Celery 配置沒有禁用結(jié)果保存功能。

要修復(fù)這個(gè)問題,請(qǐng)按以下步驟檢查和修正設(shè)置:

檢查 Flask 應(yīng)用的 Celery 配置

確保 Flask 應(yīng)用中的 Celery 配置了正確的結(jié)果后端,并且沒有禁用任務(wù)結(jié)果的存儲(chǔ)。例如:

celery_app = Celery(
    configs.celery.name,
    task_cls=FlaskTask,
    broker=app.config["CELERY_BROKER_URL"],
    backend=app.config["CELERY_BACKEND"],  # 確保配置了結(jié)果后端
    task_ignore_result=False,  # 確保不忽略任務(wù)結(jié)果
)

celery_app.conf.update(
    result_backend=app.config["CELERY_RESULT_BACKEND"],  # 確保配置了結(jié)果后端
    broker_connection_retry_on_startup=True,
)

# 確保沒有不必要的配置禁用結(jié)果存儲(chǔ)

檢查 Django 的 Celery 配置

settings.py 中,確保定義了 Celery 結(jié)果后端,并且配置與 Flask 中的配置一致:

# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 替換為實(shí)際的 Broker URL
CELERY_RESULT_BACKEND = 'django-db'  # 使用 Django 數(shù)據(jù)庫作為結(jié)果后端
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_RESULT_PERSISTENT = True

# 安裝的應(yīng)用程序
INSTALLED_APPS = [
    # 其他應(yīng)用
    'django_celery_results',
    'django_celery_beat',
]

# 其他配置

同步數(shù)據(jù)庫

確保 Django 數(shù)據(jù)庫與 Celery 結(jié)果模型一致:

python manage.py migrate django_celery_results
python manage.py migrate django_celery_beat

確保 Django Admin 中注冊(cè)了相關(guān)模型

確保在 admin.py 中注冊(cè)了 django_celery_resultsdjango_celery_beat 的模型,以便在 Admin 界面中查看:

from django.contrib import admin
from django_celery_results.models import TaskResult
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule

admin.site.register(TaskResult)
admin.site.register(PeriodicTask)
admin.site.register(IntervalSchedule)
admin.site.register(CrontabSchedule)

測(cè)試 Celery 任務(wù)

確保從 Flask 發(fā)送的 Celery 任務(wù)能正確存儲(chǔ)結(jié)果:

@celery_app.task(bind=True)
def debug_task(self, *args, **kwargs):
    print(f'Request: {self.request!r}')
    return 'Test Result'

在 Flask 應(yīng)用中調(diào)用這個(gè)任務(wù):

debug_task.delay()

然后檢查 Django Admin 界面中的任務(wù)結(jié)果是否顯示。

檢查 Celery Worker 配置

確保 celery worker 是在一個(gè)共享的 Broker 和 Backend 上運(yùn)行:

celery -A your_flask_app_name worker --loglevel=info

通過這些步驟,應(yīng)該能確保在 Django Admin 界面中正確顯示 Flask 應(yīng)用中 Celery Worker 發(fā)起的任務(wù)和任務(wù)執(zhí)行結(jié)果。

如果問題仍然存在,檢查日志和配置是否有任何錯(cuò)誤,并確保 Flask 和 Django 的所有 Celery 配置和數(shù)據(jù)庫訪問是有效且一致的。

總結(jié)

通過 Django Admin 管理 Celery Worker 任務(wù)是一種方便的方式,可以通過簡(jiǎn)單的配置和定制來實(shí)現(xiàn)任務(wù)的查詢、查看、重試和終止等操作。

通過本文提供的步驟和示例,您可以輕松地在 Django 項(xiàng)目中集成 Celery Worker,并通過 Django Admin 界面對(duì)任務(wù)進(jìn)行管理和監(jiān)控。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Python異常對(duì)代碼運(yùn)行性能的影響實(shí)例解析

    Python異常對(duì)代碼運(yùn)行性能的影響實(shí)例解析

    這篇文章主要介紹了Python異常對(duì)代碼運(yùn)行性能的影響實(shí)例解析,分享了相關(guān)代碼示例,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下
    2018-02-02
  • 訊飛webapi語音識(shí)別接口調(diào)用示例代碼(python)

    訊飛webapi語音識(shí)別接口調(diào)用示例代碼(python)

    這篇文章主要介紹了如何使用Python3調(diào)用訊飛WebAPI語音識(shí)別接口,重點(diǎn)解決了在處理語音識(shí)別結(jié)果時(shí)判斷是否為最后一幀的問題,通過運(yùn)行代碼并總結(jié)經(jīng)驗(yàn),解決了常見的模塊和屬性錯(cuò)誤,需要的朋友可以參考下
    2025-03-03
  • python 實(shí)現(xiàn)提取某個(gè)索引中某個(gè)時(shí)間段的數(shù)據(jù)方法

    python 實(shí)現(xiàn)提取某個(gè)索引中某個(gè)時(shí)間段的數(shù)據(jù)方法

    今天小編就為大家分享一篇python 實(shí)現(xiàn)提取某個(gè)索引中某個(gè)時(shí)間段的數(shù)據(jù)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-02-02
  • python爬蟲入門教程--快速理解HTTP協(xié)議(一)

    python爬蟲入門教程--快速理解HTTP協(xié)議(一)

    http協(xié)議是互聯(lián)網(wǎng)里面最重要,最基礎(chǔ)的協(xié)議之一,我們的爬蟲需要經(jīng)常和http協(xié)議打交道。下面這篇文章主要給大家介紹了關(guān)于python爬蟲入門之快速理解HTTP協(xié)議的相關(guān)資料,文中介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-05-05
  • python爬蟲MeterSphere平臺(tái)執(zhí)行報(bào)告使用進(jìn)階

    python爬蟲MeterSphere平臺(tái)執(zhí)行報(bào)告使用進(jìn)階

    這篇文章主要為大家介紹了python爬蟲MeterSphere平臺(tái)執(zhí)行報(bào)告使用進(jìn)階示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • Python decimal模塊的使用示例詳解

    Python decimal模塊的使用示例詳解

    decimal 模塊decimal意思為十進(jìn)制,這個(gè)模塊提供了十進(jìn)制浮點(diǎn)運(yùn)算支持,本篇文章主要給大家講解Python decimal模塊的使用,需要的朋友可以參考下
    2023-03-03
  • python中random模塊詳解

    python中random模塊詳解

    Python中的random模塊用于生成隨機(jī)數(shù),它提供了很多函數(shù),本文給大家分享常用函數(shù)總結(jié),感興趣的朋友跟隨小編一起看看吧
    2021-03-03
  • 初步認(rèn)識(shí)Python中的列表與位運(yùn)算符

    初步認(rèn)識(shí)Python中的列表與位運(yùn)算符

    這篇文章主要介紹了Python中的列表與位運(yùn)算符,是Python入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-10-10
  • 關(guān)于Python中進(jìn)度條的六個(gè)實(shí)用技巧分享

    關(guān)于Python中進(jìn)度條的六個(gè)實(shí)用技巧分享

    在項(xiàng)目開發(fā)過程中加載、啟動(dòng)、下載項(xiàng)目難免會(huì)用到進(jìn)度條,下面這篇文章主要給大家介紹了關(guān)于Python中進(jìn)度條的六個(gè)實(shí)用技巧,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-04-04
  • PyCharm 專業(yè)版安裝圖文教程

    PyCharm 專業(yè)版安裝圖文教程

    這篇文章主要介紹了PyCharm 專業(yè)版安裝圖文教程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-02-02

最新評(píng)論