欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Django中使用Celery的方法示例

 更新時間:2018年11月29日 08:27:21   作者:棲遲于一丘  
這篇文章主要介紹了Django中使用Celery的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

起步

在 《分布式任務隊列Celery使用說明》 中介紹了在 Python 中使用 Celery 來實驗異步任務和定時任務功能。本文介紹如何在 Django 中使用 Celery。

安裝

pip install django-celery

這個命令使用的依賴是 Celery 3.x 的版本,所以會把我之前安裝的 4.x 卸載,不過對功能上并沒有什么影響。我們也完全可以僅用Celery在django中使用,但使用 django-celery 模塊能更好的管理 celery。

使用

可以把有關 Celery 的配置放到 settings.py 里去,但我比較習慣單獨一個文件來放,然后在 settings.py 引入進來:

# celery_config.py
import djcelery
import os

os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
djcelery.setup_loader()

BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_IMPORTS = (
 'app.tasks',
)

# 有些情況可以防止死鎖
CELERY_FORCE_EXECV = True

# 設置并發(fā)的worker數(shù)量
CELERYD_CONCURRENCY = 4

# 任務發(fā)送完成是否需要確認,這一項對性能有一點影響
CELERY_ACKS_LATE = True

# 每個worker執(zhí)行了多少任務就會銷毀,防止內(nèi)存泄露,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40

# 規(guī)定完成任務的時間
CELERYD_TASK_TIME_LIMIT = 15 * 60 # 在15分鐘內(nèi)完成任務,否則執(zhí)行該任務的worker將被殺死,任務移交給父進程

# 設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數(shù)據(jù)都會發(fā)送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default"

# 設置詳細的隊列
CELERY_QUEUES = {
 "default": { # 這是上面指定的默認隊列
  "exchange": "default",
  "exchange_type": "direct",
  "routing_key": "default"
 },
 "beat_queue": {
  "exchange": "beat_queue",
  "exchange_type": "direct",
  "routing_key": "beat_queue"
 }

}

配置文件中設置了 CELERY_IMPORTS 導入的任務,所以在django app中創(chuàng)建相應的任務文件:

# app/tasks.py
from celery.task import Task
import time

class TestTask(Task):
 name = 'test-task' # 給任務設置個自定義名稱

 def run(self, *args, **kwargs):
  print('start test task')
  time.sleep(4)
  print('args={}, kwargs={}'.format(args, kwargs))
  print('end test task')

在 settings.py 添加:

INSTALLED_APPS = [
 # ...
 'djcelery',
]

# Celery
from learn_django.celery_config import *

觸發(fā)任務或提交任務可以在view中來調(diào)用:

# views.py
from django.http import HttpResponse
from app.tasks import TestTask

def test_task(request):
 # 執(zhí)行異步任務
 print('start do request')
 t = TestTask()
 t.delay()
 print('end do request')
 return HttpResponse('ok')

啟動 woker 的命令是:

python manage.py celery worker -l info

再啟動django,訪問該view,可以看到任務在worker中被消費了。

定時任務

在celery的配置文件 celery_config.py 文件中添加:

CELERYBEAT_SCHEDULE = {
 'task1-every-1-min': { # 自定義名稱
  'task': 'test-task', # 與任務中name名稱一致
  'schedule': datetime.timedelta(seconds=5),
  'args': (2, 15),
  'options': {
   'queue': 'beat_queue', # 指定要使用的隊列
  }
 },

}

通過 options 的 queque 來指定要使用的隊列,這里需要單獨的隊列是因為,如果所有任務都使用同一隊列,對于定時任務來說,任務提交后會位于隊列尾部,任務的執(zhí)行時間會靠后,所以對于定時任務來說,使用單獨的隊列。

啟動 beat:

python manage.py celery beat -l info

監(jiān)控工具 flower

如果celery中的任務執(zhí)行失敗了,有些場景是需要對這些任務進行監(jiān)控, flower 是基于 Tornado 開發(fā)的web應用。安裝用 pip install flower ;啟動它可以是:

python manage.py celery flower

# python manage.py celery flower --basic_auth=admin:admin

用瀏覽器訪問 http://localhost:5555 即可查看:

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關文章

最新評論