Python分布式異步任務(wù)框架Celery使用教程
一、Celery架構(gòu)介紹
Celery:芹菜?(跟翻譯沒有任何關(guān)系),分布式異步任務(wù)框架(跟其他web框架無(wú)關(guān))
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)
celery服務(wù)為其他項(xiàng)目服務(wù)提供異步解決任務(wù)需求的。
架構(gòu):
分為三部分
- broker:任務(wù)中間件,用戶提交的任務(wù),存在這個(gè)里面(redis,rabbitmq)
- worker:任務(wù)執(zhí)行者,消費(fèi)者,真正執(zhí)行任務(wù)的進(jìn)程(真正干活的人)
- backend:任務(wù)結(jié)果存儲(chǔ),任務(wù)執(zhí)行后的結(jié)果(redis,rabbitmq)
celery能夠做的事:
- 異步任務(wù)(區(qū)分同步任務(wù))
- 延遲任務(wù)
- 定時(shí)任務(wù)(其他框架做)
怎么更好的理解celery?
會(huì)有兩個(gè)服務(wù)同時(shí)運(yùn)行,一個(gè)是項(xiàng)目服務(wù)(django服務(wù)),一個(gè)是celery服務(wù),項(xiàng)目服務(wù)將需要異步處理的任務(wù)交給celery服務(wù),celery就會(huì)在需要時(shí)異步完成項(xiàng)目的需求。打個(gè)比方,人是一個(gè)獨(dú)立運(yùn)行的服務(wù)(django) | 醫(yī)院也是一個(gè)獨(dú)立運(yùn)行的服務(wù)(celery)。正常情況下,人可以完成所有健康情況的動(dòng)作,不需要醫(yī)院的參與;但當(dāng)人生病時(shí),就會(huì)被醫(yī)院接收,解決人生病問題,人生病的處理方案交給醫(yī)院來(lái)解決,所有人不生病時(shí),醫(yī)院獨(dú)立運(yùn)行,人生病時(shí),醫(yī)院就來(lái)解決人生病的需求。
注:python有自己的定時(shí)任務(wù),感興趣的了解下apscheduler
。
二、Celery簡(jiǎn)單使用
安裝:pip install celery==5.1.2
使用:
1.配置celery
from celery import Celery # app=Celery('test',) # backend='redis://:密碼@127.0.0.1:6379/1' 如果有密碼,這么寫 broker = 'redis://127.0.0.1:6379/1' # redis地址 backend = 'redis://127.0.0.1:6379/2' # redis地址 # 1 實(shí)例化得到celery對(duì)象 app = Celery(__name__, backend=backend, broker=broker) # 2 寫一堆任務(wù)(計(jì)算a+b,挖井,砍樹),函數(shù) # 使用裝飾器包裹任務(wù)(函數(shù)) @app.task() def add(a, b): import time time.sleep(2) return a + b
2.提交任務(wù)
# from celery_task import app import celery_task # 1 同步執(zhí)行 # res = celery_task.add(2, 3) # 普通的同步任務(wù),同步執(zhí)行任務(wù) # print(res)
2 異步任務(wù):
第一步:提交(使用任務(wù)名.apply_async(參數(shù)))
結(jié)果是任務(wù)id號(hào),唯一標(biāo)識(shí)這個(gè)任務(wù)
# res = celery_task.add.apply_async(args=[2, 3]) res = celery_task.add.apply_async(kwargs={'a':2,'b':3}) print(res) # abab1ad3-0e58-4faa-bc05-14d157dc8217
第二步:讓worker執(zhí)行—>結(jié)果存到redis
通過命令啟動(dòng),非windows:
5.x之前這么啟動(dòng)
命令:celery worker -A celery_task -l info
5.x以后
命令:celery -A celery_task worker -l info
windows:
pip3 install eventlet
5.x之前這么啟動(dòng)
命令:celery worker -A celery_task -l info -P eventlet
5.x以后
命令:celery -A celery_task worker -l info -P eventlet
3.查看任務(wù)執(zhí)行結(jié)果
from celery_task import app from celery.result import AsyncResult id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217' if __name__ == '__main__': a = AsyncResult(id=id, app=app) if a.successful(): print('任務(wù)執(zhí)行成功了') result = a.get() # 異步任務(wù)執(zhí)行的結(jié)果 print(result) elif a.failed(): print('任務(wù)失敗') elif a.status == 'PENDING': print('任務(wù)等待中被執(zhí)行') elif a.status == 'RETRY': print('任務(wù)異常后正在重試') elif a.status == 'STARTED': print('任務(wù)已經(jīng)開始被執(zhí)行')
三、Celery包結(jié)構(gòu)
目錄結(jié)構(gòu):
-celery_task # 包名
__init__.py
celery.py # app所在py文件
course_task.py # 任務(wù)
order_task.py # 任務(wù)
user_task.py # 任務(wù)
提交任務(wù).py # 提交任務(wù)
查看結(jié)果.py # 查看結(jié)果
創(chuàng)建多個(gè)任務(wù):
celery_task /celery.py
from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' # include 是一個(gè)列表,放被管理的task 的py文件 app = Celery(__name__, backend=backend, broker=broker,include=[ 'celery_task.course_task', 'celery_task.order_task', 'celery_task.user_task', ]) # 原來(lái),任務(wù)寫在這個(gè)py文件中 # 后期任務(wù)非常多,可能有用戶相關(guān)任務(wù),課程相關(guān)任務(wù),訂單相關(guān)任務(wù)。。。
celery_task /任務(wù).py
user_task.py
import time from .celery import app # 發(fā)送短信任務(wù) @app.task() def send_sms(phone, code): time.sleep(3) # 模擬發(fā)送短信延遲 print('短信發(fā)送成功,手機(jī)號(hào)是:%s,驗(yàn)證碼是:%s' % (phone, code)) return '短信發(fā)送成功'
order_task.py
from .celery import app # 生成訂單任務(wù) @app.task() def make_order(): with open(r'D:\py18\luffy_api\script\2 celery的包結(jié)構(gòu)\celery_task\order.txt', 'a', encoding='utf-8') as f: f.write('生成一條訂單\n') return True
course_task.py
from .celery import app @app.task() def add(a,b): return a+b
提交多個(gè)任務(wù):
from celery_task import user_task,order_task # 提交一個(gè)發(fā)送短信任務(wù) # res = user_task.send_sms.apply_async(args=['18972374345', '8888']) # print(res) # 提交一個(gè)生成訂單任務(wù) # res=order_task.make_order.apply_async() # print(res)
查看結(jié)果:
from celery_task.celery import app from celery.result import AsyncResult id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3' if __name__ == '__main__': a = AsyncResult(id=id, app=app) print(app.conf) if a.successful(): print('任務(wù)執(zhí)行成功了') result = a.get() # 異步任務(wù)執(zhí)行的結(jié)果 print(result) elif a.failed(): print('任務(wù)失敗') elif a.status == 'PENDING': print('任務(wù)等待中被執(zhí)行') elif a.status == 'RETRY': print('任務(wù)異常后正在重試') elif a.status == 'STARTED': print('任務(wù)已經(jīng)開始被執(zhí)行')
四、Celery延遲任務(wù)
# 添加延遲任務(wù)方式一: # from datetime import datetime, timedelta # datetime.utcnow() 獲取當(dāng)前的utc時(shí)間 # eta=datetime.utcnow() + timedelta(seconds=50) # 50s后的utc時(shí)間 # 10s后,發(fā)送短信 res=user_task.send_sms.apply_async(args=('12345566677', '8888'), eta=eta) print(res) # 使用第二種方式執(zhí)行異步任務(wù)(兩者傳參不同;不寫時(shí)間,就表示立即執(zhí)行): res=user_task.send_sms.delay('12345566677', '8888') print(res)
五、Celery定時(shí)任務(wù)
第一步:celery.py中寫入
# 第一步,在包(celery_task)下的celey.py中寫入 ###修改celery的配置信息 app.conf整個(gè)celery的配置信息 # 時(shí)區(qū) app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False ####配置定時(shí)任務(wù) from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'send_sms_every_3_seconds': { 'task': 'celery_task.user_task.send_sms', # 指定執(zhí)行的是哪個(gè)任務(wù) 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點(diǎn) 'args': ('18953675221', '8888'), }, 'make_order_every_5_seconds': { 'task': 'celery_task.order_task.make_order', # 指定執(zhí)行的是哪個(gè)任務(wù) 'schedule': timedelta(seconds=5), }, 'add_every_1_seconds': { 'task': 'celery_task.course_task.add', # 指定執(zhí)行的是哪個(gè)任務(wù) 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點(diǎn) 'args': (3, 5), }, }
第二步:?jiǎn)?dòng)worker
# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet
如果beat沒有啟動(dòng),worker是沒有活干的,需要啟動(dòng)beat,worker才能干活,和beat啟動(dòng)順序無(wú)先后
第三步:?jiǎn)?dòng)beat
# celery beat -A celery_task -l
celery -A celery_task beat -l info
六、Django中集成Celery
第一種方式使用django-celery
(了解):
第三方把django和celery集成起來(lái),方便我們使用,但是,第三方寫的包的版本,跟celery和django版本完全對(duì)應(yīng)。
我們自己使用包結(jié)構(gòu)集成到django中:
第一步,把寫好的包,直接復(fù)制到項(xiàng)目根路徑
第二步,在視圖類中(函數(shù)中)
from celery_task.user_task import send_sms def test(request): mobile = request.GET.get('mobile') code = '9999' res = send_sms.delay(mobile, code) # 同步發(fā)送假設(shè)3分支鐘,異步發(fā)送,直接就返回id了,是否成功不知道,后期通過id查詢 print(res) return HttpResponse(res)
到此這篇關(guān)于Python分布式異步任務(wù)框架Celery使用教程的文章就介紹到這了,更多相關(guān)Python Celery內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python常用數(shù)據(jù)結(jié)構(gòu)元組詳解
這篇文章主要介紹了python常用數(shù)據(jù)結(jié)構(gòu)元組詳解,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-08-08python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的貪吃蛇游戲附代碼
這篇文章主要介紹了python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的貪吃蛇游戲附代碼,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-06-06用PyQt進(jìn)行Python圖形界面的程序的開發(fā)的入門指引
這篇文章主要介紹了用PyQt進(jìn)行Python圖形界面的程序的開發(fā)的入門指引,來(lái)自于IBM官方網(wǎng)站技術(shù)文檔,需要的朋友可以參考下2015-04-04Python的Asyncore異步Socket模塊及實(shí)現(xiàn)端口轉(zhuǎn)發(fā)的例子
asyncore模塊是封裝過的處理socket事件的模塊,采用異步的處理方式,這里我們講來(lái)講解Python的Asyncore異步Socket模塊及實(shí)現(xiàn)端口轉(zhuǎn)發(fā)的例子,需要的朋友可以參考下2016-06-06解決linux下使用python打開terminal時(shí)報(bào)錯(cuò)的問題
這篇文章主要介紹了linux下使用python打開terminal時(shí)報(bào)錯(cuò),本文通過兩種場(chǎng)景分析給大家詳細(xì)講解,需要的朋友可以參考下2023-03-03Python流程控制 if else實(shí)現(xiàn)解析
這篇文章主要介紹了Python 流程控制 if else實(shí)現(xiàn)解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09