python中celery的基本使用詳情
1.基本介紹
Celery
是由Python
編寫的簡單,靈活,可靠的用來處理大量信息的分布式系統(tǒng),它同時提供操作和維護(hù)分布式系統(tǒng)所需的工具。Celery
專注于實(shí)時任務(wù)處理,支持任務(wù)調(diào)度。
簡單的說,它就是一個分布式隊(duì)列的管理工具,用celery提供的接口快速實(shí)現(xiàn)并管理一個分布式的任務(wù)隊(duì)列。
有一點(diǎn)我們需要搞清楚,Celery
本身并不是任務(wù)隊(duì)列,它是一個分布式隊(duì)列的管理工具
,Celery封裝好了操作常見任務(wù)隊(duì)列的各種操作,比如說從監(jiān)聽某個任務(wù)隊(duì)列并從該隊(duì)列中拿到數(shù)據(jù)進(jìn)行消費(fèi)。
2.使用場景
它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機(jī)上運(yùn)行。我們通常使用它來實(shí)現(xiàn)異步任務(wù)(async task
)和定時任務(wù)(crontab
)。
- 異步任務(wù): 將耗時操作任務(wù)提交給
Celery
去異步執(zhí)行,比如發(fā)送短信/郵件、消息推送、音視頻處理等等 - 定時任務(wù): 定時執(zhí)行某件事情,比如每天數(shù)據(jù)統(tǒng)計(jì)
3.工作流程和組成部分
這里用一張圖片說明下:
Celery的架構(gòu)由三部分組成,消息中間件(message broker
),任務(wù)執(zhí)行單元(worker
)和任務(wù)執(zhí)行結(jié)果存儲(task result store
)組成。
消息中間件:
Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括RabbitMQ
, Redis
等等,官方推薦用rabbitMQ
,因?yàn)樗志梅€(wěn)定。
任務(wù)執(zhí)行單元:
Worker
是Celery
提供的任務(wù)執(zhí)行的單元,worker
并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中。
任務(wù)結(jié)果存儲:
Task result store
用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery
支持以不同方式存儲任務(wù)的結(jié)果,包括AMQP, redis
等
另外, Celery還支持不同的并發(fā)和序列化的手段。
并發(fā):Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等 先安裝模塊
pip install celery pip install redis
4.Celery執(zhí)行異步任務(wù)
4.1 基礎(chǔ)使用
這里項(xiàng)目結(jié)構(gòu)如下:
第一步:先創(chuàng)建celery相關(guān)配置配置celery_object.py
import celery # 執(zhí)行如下命令: celery -A celery_object worker -l info backend = "redis://127.0.0.1:6379/4" # 設(shè)置redis的4號數(shù)據(jù)庫來存放結(jié)果 broker = "redis://127.0.0.1:6379/5" # 設(shè)置redis的5號數(shù)據(jù)庫存放消息中間件 celery_app = celery.Celery( "celery_demo", backend=backend, broker=broker, include=[ "celery_task", ], ) celery_app.conf.task_serializer = "json" celery_app.conf.result_serializer = "json" celery_app.conf.accept_content = ["json"] celery_app.conf.timezone = "Asia/Shanghai" # 時區(qū) celery_app.conf.enable_utc = False # 是否使用UTC
參數(shù)說明:
- backend 就是異步任務(wù)執(zhí)行完成以后,結(jié)果存放的地方。
- broker 就是具體執(zhí)行任務(wù)的工作節(jié)點(diǎn)。
- celery.Celery()方法是實(shí)例化一個celery對象。
第二步:創(chuàng)建任務(wù)相關(guān)的文件celery_task.py
import time from celery_object import celery_app @celery_app.task def send_email(name): print("向%s發(fā)送郵件..." % name) time.sleep(5) print("向%s發(fā)送郵件完成" % name) return f"成功拿到{name}發(fā)送的郵件!" @celery_app.task def send_msg(name): print("向%s發(fā)送短信..." % name) time.sleep(5) print("向%s發(fā)送短信完成" % name) return f"成功拿到{name}發(fā)送的短信!"
通過@celery_app.task
這樣的裝飾器,成功的把對應(yīng)的函數(shù)變成對應(yīng)celery
的異步worker
函數(shù)。
緊接著我們在項(xiàng)目當(dāng)前所在的目錄執(zhí)行命令:
celery -A celery_object worker -l info
- -A 指的是application應(yīng)用對象
- worker 就是工作人(固定寫法)
- -l 指的是日志級別,這里是打印
info
級別的日志
之后就可以有下面的輸出顯示就代表celery
啟動成功:
之后我們就可以向celery
生產(chǎn)任務(wù)了,創(chuàng)建produce_result
.py文件。
from celery_task import send_email, send_msg if __name__ == "__main__": for i in range(10): result = send_email.delay(f"張三{i}") print(result.id) result2 = send_msg.delay(f"李四{i}") print(result2.id)
運(yùn)行生產(chǎn)任務(wù)的程序,會看到如下的數(shù)據(jù),這里打印的就是任務(wù)ID。
然后在終端可以看到下面的東西,就代表celery
成功的拿到隊(duì)列中任務(wù) 并進(jìn)行消費(fèi)了。
然后打開我們的redis
可以看到有對應(yīng)的數(shù)據(jù)記錄。
與此同時 我們還可以查看celery任務(wù)ID的狀態(tài),check_result.py
寫入如下:
from celery.result import AsyncResult from celery_object import celery_app async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app) print("任務(wù)狀態(tài):", async_result.status) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 將結(jié)果刪除 elif async_result.failed(): print("執(zhí)行失敗") elif async_result.status == "PENDING": print("任務(wù)等待中被執(zhí)行") elif async_result.status == "RETRY": print("任務(wù)異常后正在重試") elif async_result.status == "STARTED": print("任務(wù)已經(jīng)開始被執(zhí)行")
運(yùn)行結(jié)果:
任務(wù)狀態(tài): SUCCESS
成功拿到李四0發(fā)送的短信!
到此這篇關(guān)于python中celery的基本使用詳情的文章就介紹到這了,更多相關(guān)python celery內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- python測試開發(fā)django之使用supervisord?后臺啟動celery?服務(wù)(worker/beat)
- python中使用Celery容聯(lián)云異步發(fā)送驗(yàn)證碼功能
- Python中celery的使用
- python使用celery實(shí)現(xiàn)訂單超時取消
- python3中celery異步框架簡單使用+守護(hù)進(jìn)程方式啟動
- Python Celery異步任務(wù)隊(duì)列使用方法解析
- python使用celery實(shí)現(xiàn)異步任務(wù)執(zhí)行的例子
- python celery分布式任務(wù)隊(duì)列的使用詳解
- Python環(huán)境下安裝使用異步任務(wù)隊(duì)列包Celery的基礎(chǔ)教程
相關(guān)文章
對python生成業(yè)務(wù)報(bào)表的實(shí)例詳解
今天小編就為大家分享一篇對python生成業(yè)務(wù)報(bào)表的實(shí)例詳解,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-02-02Pandas 對Dataframe結(jié)構(gòu)排序的實(shí)現(xiàn)方法
下面小編就為大家分享一篇Pandas 對Dataframe結(jié)構(gòu)排序的實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04pycharm引入其他目錄的包報(bào)錯,import報(bào)錯的解決
這篇文章主要介紹了pycharm引入其他目錄的包報(bào)錯,import報(bào)錯的解決,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08