Django使用Celery異步任務(wù)隊(duì)列的使用
1 Celery簡介
Celery是異步任務(wù)隊(duì)列,可以獨(dú)立于主進(jìn)程運(yùn)行,在主進(jìn)程退出后,也不影響隊(duì)列中的任務(wù)執(zhí)行。
任務(wù)執(zhí)行異常退出,重新啟動(dòng)后,會(huì)繼續(xù)執(zhí)行隊(duì)列中的其他任務(wù),同時(shí)可以緩存停止期間接收的工作任務(wù),這個(gè)功能依賴于消息隊(duì)列(MQ、Redis)。
1.1 Celery原理
Celery的 架構(gòu) 由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成。
消息中間件:Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括, RabbitMQ , Redis , MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。推薦使用:RabbitMQ、Redis作為消息隊(duì)列。
任務(wù)執(zhí)行單元:Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中。
任務(wù)結(jié)果存儲(chǔ):Task result store用來存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache
1.2Celery適用場景
異步任務(wù)處理:例如給注冊(cè)用戶發(fā)送短消息或者確認(rèn)郵件任務(wù)。 大型任務(wù):執(zhí)行時(shí)間較長的任務(wù),例如視頻和圖片處理,添加水印和轉(zhuǎn)碼等,需要執(zhí)行任務(wù)時(shí)間長。 定時(shí)執(zhí)行的任務(wù):支持任務(wù)的定時(shí)執(zhí)行和設(shè)定時(shí)間執(zhí)行。例如性能壓測定時(shí)執(zhí)行。
2Celery開發(fā)環(huán)境準(zhǔn)備
2.1 環(huán)境準(zhǔn)備
軟件名稱 |
版本號(hào) |
說明 |
Linux |
Centos 6.5(64bit) |
操作系統(tǒng) |
Python |
3.5.2 |
|
Django |
1.10 |
Web框架 |
Celery |
4.0.2 |
異步任務(wù)隊(duì)列 |
Redis |
2.4 |
消息隊(duì)列 |
2.2 Celery安裝
使用方法介紹:
Celery的運(yùn)行依賴消息隊(duì)列,使用時(shí)需要安裝redis或者rabbit。
這里我們使用Redis。安裝redis庫:
sudo yum install redis
啟動(dòng)redis:
sudo service redis start
安裝celery庫
sudo pip install celery==4.0.2
3Celery單獨(dú)執(zhí)行任務(wù)
3.1編寫任務(wù)
創(chuàng)建task.py文件
說明:這里初始Celery實(shí)例時(shí)就加載了配置,使用的redis作為消息隊(duì)列和存儲(chǔ)任務(wù)結(jié)果。
運(yùn)行celery:
$ celery -A task worker --loglevel=info
看到下面的打印,說明celery成功運(yùn)行。
3.2 調(diào)用任務(wù)
直接打開python交互命令行
執(zhí)行下面代碼:
可以celery的窗口看到任務(wù)的執(zhí)行信息
任務(wù)執(zhí)行狀態(tài)監(jiān)控和獲取結(jié)果:
3.3任務(wù)調(diào)用方法總結(jié)
有兩種方法:
delay和apply_async ,delay方法是apply_async簡化版。
add.delay(2, 2) add.apply_async((2, 2)) add.apply_async((2, 2), queue='lopri')
delay方法是apply_async簡化版本。
apply_async方法是可以帶非常多的配置參數(shù),包括指定隊(duì)列等
Queue 指定隊(duì)列名稱,可以把不同任務(wù)分配到不同的隊(duì)列 3.4 任務(wù)狀態(tài)
每個(gè)任務(wù)有三種狀態(tài):PENDING -> STARTED -> SUCCESS
任務(wù)查詢狀態(tài):res.state
來查詢?nèi)蝿?wù)的狀態(tài)
4與Django集成
上面簡單介紹了celery異步任務(wù)的基本方法,結(jié)合我們實(shí)際的應(yīng)用,我們需要與Django一起使用,下面介紹如何與Django結(jié)合。
4.1與Django集成方法
與Django集成有兩種方法:
- Django 1.8 以上版本:與Celery 4.0版本集成
- Django 1.8 以下版本:與Celery3.1版本集成,使用django-celery庫
今天我們介紹celery4.0 和django 1.8以上版本集成方法。
4.2 創(chuàng)建項(xiàng)目文件
創(chuàng)建一個(gè)項(xiàng)目:名字叫做proj
- proj/ - proj/__init__.py - proj/settings.py - proj/urls.py - proj/wsgi.py - manage.py
創(chuàng)建一個(gè)新的文件: proj/proj/mycelery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks()
在proj/proj/__init__.py:添加
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .mycelery import app as celery_app __all__ = ['celery_app']
4.3 配置Celery
我們?cè)趍ycelery.py文件中說明celery的配置文件在settings.py中,并且是以CELERY開頭。
app.config_from_object('django.conf:settings', namespace='CELERY')
在settings.py文件中添加celery配置:
我們的配置是使用redis作為消息隊(duì)列,消息的代理和結(jié)果都是用redis,任務(wù)的序列化使用json格式。
重要:redis://127.0.0.1:6379/0這個(gè)說明使用的redis的0號(hào)隊(duì)列,如果有多個(gè)celery任務(wù)都使用同一個(gè)隊(duì)列,則會(huì)造成任務(wù)混亂。最好是celery實(shí)例單獨(dú)使用一個(gè)隊(duì)列。
4.4創(chuàng)建APP
創(chuàng)建Django的App,名稱為celery_task,在app目錄下創(chuàng)建tasks.py文件。
完成后目錄結(jié)構(gòu)為:
├── celery_task │ ├── admin.py │ ├── apps.py │ ├── __init__.py │ ├── migrations │ │ └── __init__.py │ ├── models.py │ ├── tasks.py │ ├── tests.py │ └── views.py ├── db.sqlite3 ├── manage.py ├── proj │ ├── celery.py │ ├── __init__.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py └── templates
4.5編寫task任務(wù)
編輯任務(wù)文件
tasks.py
在tasks.py文件中添加下面代碼
# Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
啟動(dòng)celery:celery -A proj.mycelery worker -l info
說明:proj 為模塊名稱,mycelery 為celery 的實(shí)例所在的文件。
啟動(dòng)成功打?。?/p>
4.6在views中調(diào)用任務(wù)
在views中編寫接口,實(shí)現(xiàn)兩個(gè)功能:
- 觸發(fā)任務(wù),然后返回任務(wù)的結(jié)果和任務(wù)ID
- 根據(jù)任務(wù)ID查詢?nèi)蝿?wù)狀態(tài)
代碼如下:
啟動(dòng)django。
新開一個(gè)會(huì)話啟動(dòng)celery;啟動(dòng)命令為:celery –A proj.mycelery worker –l info
訪問 http://127.0.0.1:8000/add ,可以看到返回的結(jié)果。
在celery運(yùn)行的頁面,可以看到下面輸出:
4.7在views中查詢?nèi)蝿?wù)狀態(tài)
有的時(shí)候任務(wù)執(zhí)行時(shí)間較長,需要查詢?nèi)蝿?wù)是否執(zhí)行完成,可以根據(jù)任務(wù)的id來查詢?nèi)蝿?wù)狀態(tài),根據(jù)狀態(tài)進(jìn)行下一步操作。
可以看到任務(wù)的狀態(tài)為:SUCCESS
5Celery定時(shí)任務(wù)
Celery作為異步任務(wù)隊(duì)列,我們可以按照我們?cè)O(shè)置的時(shí)間,定時(shí)的執(zhí)行一些任務(wù),例如每日數(shù)據(jù)庫備份,日志轉(zhuǎn)存等。
Celery的定時(shí)任務(wù)配置非常簡單:
定時(shí)任務(wù)的配置依然在setting.py文件中。
說明:如果覺得celery 的數(shù)據(jù)配置文件和Django 的都在setting.py 一個(gè)文件中不方便,可以分拆出來,只需要在mycelery.py 的文件中指明即可。
app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')
5.1任務(wù)間隔運(yùn)行
#每30秒調(diào)用task.add from datetime import timedelta CELERY_BEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': timedelta(seconds=30), 'args': (16, 16) }, }
5.2定時(shí)執(zhí)行
定時(shí)每天早上7:30分運(yùn)行。
注意:設(shè)置任務(wù)時(shí)間時(shí)注意時(shí)間格式,UTC時(shí)間或者本地時(shí)間。
#crontab任務(wù) #每天7:30調(diào)用task.add from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { # Executes every Monday morning at 7:30 A.M 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30), 'args': (16, 16), }, }
5.3定時(shí)任務(wù)啟動(dòng)
配置了定時(shí)任務(wù),除了worker進(jìn)程外,還需要啟動(dòng)一個(gè)beat進(jìn)程。
Beat進(jìn)程的作用就相當(dāng)于一個(gè)定時(shí)任務(wù),根據(jù)配置來執(zhí)行對(duì)應(yīng)的任務(wù)。
5.3.1 啟動(dòng)beat進(jìn)程
命令如下:celery -A proj.mycelery beat -l info
5.3.2 啟動(dòng)worker進(jìn)程
Worker進(jìn)程啟動(dòng)和前面啟動(dòng)命令一樣。celery –A proj.mycelery worker –l info
6 Celery深入
Celery任務(wù)支持多樣的運(yùn)行模式:
- 支持動(dòng)態(tài)指定并發(fā)數(shù) --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
- 支持鏈?zhǔn)饺蝿?wù)
- 支持Group任務(wù)
- 支持任務(wù)不同優(yōu)先級(jí)
- 支持指定任務(wù)隊(duì)列
- 支持使用eventlet模式運(yùn)行worker
例如:指定并發(fā)數(shù)為1000
celery -A proj.mycelery worker -c 1000
這些可以根據(jù)使用的深入自行了解和學(xué)習(xí)。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python 獲取命令行參數(shù)內(nèi)容及參數(shù)個(gè)數(shù)的實(shí)例
今天小編就為大家分享一篇Python 獲取命令行參數(shù)內(nèi)容及參數(shù)個(gè)數(shù)的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-12-12Python退出While循環(huán)的3種方法舉例詳解
在每次循環(huán)結(jié)束后,我們需要檢查循環(huán)條件是否滿足。如果條件滿足,則繼續(xù)執(zhí)行循環(huán)體內(nèi)的代碼,否則退出循環(huán),這篇文章主要給大家介紹了關(guān)于Python退出While循環(huán)的3種方法,需要的朋友可以參考下2023-10-10python numpy 一維數(shù)組轉(zhuǎn)變?yōu)槎嗑S數(shù)組的實(shí)例
今天小編就為大家分享一篇python numpy 一維數(shù)組轉(zhuǎn)變?yōu)槎嗑S數(shù)組的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-07-07使用Python pyglet庫編寫一個(gè)可播放音樂的揚(yáng)聲器類流程詳解
這篇文章主要介紹了使用Python pyglet庫編寫一個(gè)可播放音樂的揚(yáng)聲器類,Pyglet主要用于創(chuàng)建視頻游戲、獨(dú)立游戲和多媒體應(yīng)用,它提供了一組用于制作游戲的常用功能,包括圖形渲染、聲音播放、事件處理等等,需要的朋友可以參考下2024-03-03Pytorch深度學(xué)習(xí)之實(shí)現(xiàn)病蟲害圖像分類
PyTorch是一個(gè)開源的Python機(jī)器學(xué)習(xí)庫,基于Torch,用于自然語言處理等應(yīng)用程序。它具有強(qiáng)大的GPU加速的張量計(jì)算和自動(dòng)求導(dǎo)系統(tǒng)的深度神經(jīng)網(wǎng)絡(luò)。本文將介紹如何通過PyTorch實(shí)現(xiàn)病蟲害圖像分類,感興趣的可以學(xué)習(xí)一下2021-12-12