Python分布式異步任務(wù)框架Celery使用教程
一、Celery架構(gòu)介紹
Celery:芹菜?(跟翻譯沒有任何關(guān)系),分布式異步任務(wù)框架(跟其他web框架無關(guān))
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)
celery服務(wù)為其他項目服務(wù)提供異步解決任務(wù)需求的。
架構(gòu):
分為三部分
- broker:任務(wù)中間件,用戶提交的任務(wù),存在這個里面(redis,rabbitmq)
- worker:任務(wù)執(zhí)行者,消費者,真正執(zhí)行任務(wù)的進程(真正干活的人)
- backend:任務(wù)結(jié)果存儲,任務(wù)執(zhí)行后的結(jié)果(redis,rabbitmq)


celery能夠做的事:
- 異步任務(wù)(區(qū)分同步任務(wù))
- 延遲任務(wù)
- 定時任務(wù)(其他框架做)
怎么更好的理解celery?
會有兩個服務(wù)同時運行,一個是項目服務(wù)(django服務(wù)),一個是celery服務(wù),項目服務(wù)將需要異步處理的任務(wù)交給celery服務(wù),celery就會在需要時異步完成項目的需求。打個比方,人是一個獨立運行的服務(wù)(django) | 醫(yī)院也是一個獨立運行的服務(wù)(celery)。正常情況下,人可以完成所有健康情況的動作,不需要醫(yī)院的參與;但當人生病時,就會被醫(yī)院接收,解決人生病問題,人生病的處理方案交給醫(yī)院來解決,所有人不生病時,醫(yī)院獨立運行,人生病時,醫(yī)院就來解決人生病的需求。
注:python有自己的定時任務(wù),感興趣的了解下apscheduler。
二、Celery簡單使用
安裝:pip install celery==5.1.2
使用:
1.配置celery
from celery import Celery
# app=Celery('test',)
# backend='redis://:密碼@127.0.0.1:6379/1' 如果有密碼,這么寫
broker = 'redis://127.0.0.1:6379/1' # redis地址
backend = 'redis://127.0.0.1:6379/2' # redis地址
# 1 實例化得到celery對象
app = Celery(__name__, backend=backend, broker=broker)
# 2 寫一堆任務(wù)(計算a+b,挖井,砍樹),函數(shù)
# 使用裝飾器包裹任務(wù)(函數(shù))
@app.task()
def add(a, b):
import time
time.sleep(2)
return a + b2.提交任務(wù)
# from celery_task import app import celery_task # 1 同步執(zhí)行 # res = celery_task.add(2, 3) # 普通的同步任務(wù),同步執(zhí)行任務(wù) # print(res)
2 異步任務(wù):
第一步:提交(使用任務(wù)名.apply_async(參數(shù)))
結(jié)果是任務(wù)id號,唯一標識這個任務(wù)
# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={'a':2,'b':3})
print(res) # abab1ad3-0e58-4faa-bc05-14d157dc8217第二步:讓worker執(zhí)行—>結(jié)果存到redis
通過命令啟動,非windows:
5.x之前這么啟動
命令:celery worker -A celery_task -l info
5.x以后
命令:celery -A celery_task worker -l info
windows:
pip3 install eventlet
5.x之前這么啟動
命令:celery worker -A celery_task -l info -P eventlet
5.x以后
命令:celery -A celery_task worker -l info -P eventlet
3.查看任務(wù)執(zhí)行結(jié)果
from celery_task import app
from celery.result import AsyncResult
id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
print('任務(wù)執(zhí)行成功了')
result = a.get() # 異步任務(wù)執(zhí)行的結(jié)果
print(result)
elif a.failed():
print('任務(wù)失敗')
elif a.status == 'PENDING':
print('任務(wù)等待中被執(zhí)行')
elif a.status == 'RETRY':
print('任務(wù)異常后正在重試')
elif a.status == 'STARTED':
print('任務(wù)已經(jīng)開始被執(zhí)行')三、Celery包結(jié)構(gòu)
目錄結(jié)構(gòu):
-celery_task # 包名
__init__.py
celery.py # app所在py文件
course_task.py # 任務(wù)
order_task.py # 任務(wù)
user_task.py # 任務(wù)
提交任務(wù).py # 提交任務(wù)
查看結(jié)果.py # 查看結(jié)果
創(chuàng)建多個任務(wù):
celery_task /celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include 是一個列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
'celery_task.course_task',
'celery_task.order_task',
'celery_task.user_task',
])
# 原來,任務(wù)寫在這個py文件中
# 后期任務(wù)非常多,可能有用戶相關(guān)任務(wù),課程相關(guān)任務(wù),訂單相關(guān)任務(wù)。。。celery_task /任務(wù).py
user_task.py
import time
from .celery import app
# 發(fā)送短信任務(wù)
@app.task()
def send_sms(phone, code):
time.sleep(3) # 模擬發(fā)送短信延遲
print('短信發(fā)送成功,手機號是:%s,驗證碼是:%s' % (phone, code))
return '短信發(fā)送成功'order_task.py
from .celery import app
# 生成訂單任務(wù)
@app.task()
def make_order():
with open(r'D:\py18\luffy_api\script\2 celery的包結(jié)構(gòu)\celery_task\order.txt', 'a', encoding='utf-8') as f:
f.write('生成一條訂單\n')
return Truecourse_task.py
from .celery import app
@app.task()
def add(a,b):
return a+b提交多個任務(wù):
from celery_task import user_task,order_task # 提交一個發(fā)送短信任務(wù) # res = user_task.send_sms.apply_async(args=['18972374345', '8888']) # print(res) # 提交一個生成訂單任務(wù) # res=order_task.make_order.apply_async() # print(res)
查看結(jié)果:
from celery_task.celery import app
from celery.result import AsyncResult
id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
print(app.conf)
if a.successful():
print('任務(wù)執(zhí)行成功了')
result = a.get() # 異步任務(wù)執(zhí)行的結(jié)果
print(result)
elif a.failed():
print('任務(wù)失敗')
elif a.status == 'PENDING':
print('任務(wù)等待中被執(zhí)行')
elif a.status == 'RETRY':
print('任務(wù)異常后正在重試')
elif a.status == 'STARTED':
print('任務(wù)已經(jīng)開始被執(zhí)行')四、Celery延遲任務(wù)
# 添加延遲任務(wù)方式一:
# from datetime import datetime, timedelta
# datetime.utcnow() 獲取當前的utc時間
# eta=datetime.utcnow() + timedelta(seconds=50) # 50s后的utc時間
# 10s后,發(fā)送短信
res=user_task.send_sms.apply_async(args=('12345566677', '8888'), eta=eta)
print(res)
# 使用第二種方式執(zhí)行異步任務(wù)(兩者傳參不同;不寫時間,就表示立即執(zhí)行):
res=user_task.send_sms.delay('12345566677', '8888')
print(res)五、Celery定時任務(wù)
第一步:celery.py中寫入
# 第一步,在包(celery_task)下的celey.py中寫入
###修改celery的配置信息 app.conf整個celery的配置信息
# 時區(qū)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
####配置定時任務(wù)
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'send_sms_every_3_seconds': {
'task': 'celery_task.user_task.send_sms', # 指定執(zhí)行的是哪個任務(wù)
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點
'args': ('18953675221', '8888'),
},
'make_order_every_5_seconds': {
'task': 'celery_task.order_task.make_order', # 指定執(zhí)行的是哪個任務(wù)
'schedule': timedelta(seconds=5),
},
'add_every_1_seconds': {
'task': 'celery_task.course_task.add', # 指定執(zhí)行的是哪個任務(wù)
'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點
'args': (3, 5),
},
}第二步:啟動worker
# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet
如果beat沒有啟動,worker是沒有活干的,需要啟動beat,worker才能干活,和beat啟動順序無先后
第三步:啟動beat
# celery beat -A celery_task -l
celery -A celery_task beat -l info
六、Django中集成Celery
第一種方式使用django-celery(了解):
第三方把django和celery集成起來,方便我們使用,但是,第三方寫的包的版本,跟celery和django版本完全對應。
我們自己使用包結(jié)構(gòu)集成到django中:
第一步,把寫好的包,直接復制到項目根路徑
第二步,在視圖類中(函數(shù)中)
from celery_task.user_task import send_sms
def test(request):
mobile = request.GET.get('mobile')
code = '9999'
res = send_sms.delay(mobile, code) # 同步發(fā)送假設(shè)3分支鐘,異步發(fā)送,直接就返回id了,是否成功不知道,后期通過id查詢
print(res)
return HttpResponse(res)到此這篇關(guān)于Python分布式異步任務(wù)框架Celery使用教程的文章就介紹到這了,更多相關(guān)Python Celery內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python常用數(shù)據(jù)結(jié)構(gòu)元組詳解
這篇文章主要介紹了python常用數(shù)據(jù)結(jié)構(gòu)元組詳解,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-08-08
用PyQt進行Python圖形界面的程序的開發(fā)的入門指引
這篇文章主要介紹了用PyQt進行Python圖形界面的程序的開發(fā)的入門指引,來自于IBM官方網(wǎng)站技術(shù)文檔,需要的朋友可以參考下2015-04-04
Python的Asyncore異步Socket模塊及實現(xiàn)端口轉(zhuǎn)發(fā)的例子
asyncore模塊是封裝過的處理socket事件的模塊,采用異步的處理方式,這里我們講來講解Python的Asyncore異步Socket模塊及實現(xiàn)端口轉(zhuǎn)發(fā)的例子,需要的朋友可以參考下2016-06-06
解決linux下使用python打開terminal時報錯的問題
這篇文章主要介紹了linux下使用python打開terminal時報錯,本文通過兩種場景分析給大家詳細講解,需要的朋友可以參考下2023-03-03

