python中celery的基本使用詳情
1.基本介紹
Celery 是由Python 編寫的簡單,靈活,可靠的用來處理大量信息的分布式系統(tǒng),它同時提供操作和維護分布式系統(tǒng)所需的工具。Celery 專注于實時任務(wù)處理,支持任務(wù)調(diào)度。
簡單的說,它就是一個分布式隊列的管理工具,用celery提供的接口快速實現(xiàn)并管理一個分布式的任務(wù)隊列。
有一點我們需要搞清楚,Celery 本身并不是任務(wù)隊列,它是一個分布式隊列的管理工具,Celery封裝好了操作常見任務(wù)隊列的各種操作,比如說從監(jiān)聽某個任務(wù)隊列并從該隊列中拿到數(shù)據(jù)進行消費。
2.使用場景
它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現(xiàn)異步任務(wù)(async task)和定時任務(wù)(crontab)。
- 異步任務(wù): 將耗時操作任務(wù)提交給
Celery去異步執(zhí)行,比如發(fā)送短信/郵件、消息推送、音視頻處理等等 - 定時任務(wù): 定時執(zhí)行某件事情,比如每天數(shù)據(jù)統(tǒng)計
3.工作流程和組成部分
這里用一張圖片說明下:

Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(task result store)組成。
消息中間件:
Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括RabbitMQ, Redis等等,官方推薦用rabbitMQ,因為它持久穩(wěn)定。
任務(wù)執(zhí)行單元:
Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運行在分布式的系統(tǒng)節(jié)點中。
任務(wù)結(jié)果存儲:
Task result store用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲任務(wù)的結(jié)果,包括AMQP, redis等
另外, Celery還支持不同的并發(fā)和序列化的手段。
并發(fā):Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等 先安裝模塊
pip install celery pip install redis
4.Celery執(zhí)行異步任務(wù)
4.1 基礎(chǔ)使用
這里項目結(jié)構(gòu)如下:

第一步:先創(chuàng)建celery相關(guān)配置配置celery_object.py
import celery
# 執(zhí)行如下命令: celery -A celery_object worker -l info
backend = "redis://127.0.0.1:6379/4" # 設(shè)置redis的4號數(shù)據(jù)庫來存放結(jié)果
broker = "redis://127.0.0.1:6379/5" # 設(shè)置redis的5號數(shù)據(jù)庫存放消息中間件
celery_app = celery.Celery(
"celery_demo",
backend=backend,
broker=broker,
include=[
"celery_task",
],
)
celery_app.conf.task_serializer = "json"
celery_app.conf.result_serializer = "json"
celery_app.conf.accept_content = ["json"]
celery_app.conf.timezone = "Asia/Shanghai" # 時區(qū)
celery_app.conf.enable_utc = False # 是否使用UTC參數(shù)說明:
- backend 就是異步任務(wù)執(zhí)行完成以后,結(jié)果存放的地方。
- broker 就是具體執(zhí)行任務(wù)的工作節(jié)點。
- celery.Celery()方法是實例化一個celery對象。
第二步:創(chuàng)建任務(wù)相關(guān)的文件celery_task.py
import time
from celery_object import celery_app
@celery_app.task
def send_email(name):
print("向%s發(fā)送郵件..." % name)
time.sleep(5)
print("向%s發(fā)送郵件完成" % name)
return f"成功拿到{name}發(fā)送的郵件!"
@celery_app.task
def send_msg(name):
print("向%s發(fā)送短信..." % name)
time.sleep(5)
print("向%s發(fā)送短信完成" % name)
return f"成功拿到{name}發(fā)送的短信!"通過@celery_app.task這樣的裝飾器,成功的把對應的函數(shù)變成對應celery的異步worker函數(shù)。
緊接著我們在項目當前所在的目錄執(zhí)行命令:
celery -A celery_object worker -l info
- -A 指的是application應用對象
- worker 就是工作人(固定寫法)
- -l 指的是日志級別,這里是打印
info級別的日志
之后就可以有下面的輸出顯示就代表celery啟動成功:

之后我們就可以向celery生產(chǎn)任務(wù)了,創(chuàng)建produce_result.py文件。
from celery_task import send_email, send_msg
if __name__ == "__main__":
for i in range(10):
result = send_email.delay(f"張三{i}")
print(result.id)
result2 = send_msg.delay(f"李四{i}")
print(result2.id)運行生產(chǎn)任務(wù)的程序,會看到如下的數(shù)據(jù),這里打印的就是任務(wù)ID。

然后在終端可以看到下面的東西,就代表celery成功的拿到隊列中任務(wù) 并進行消費了。

然后打開我們的redis可以看到有對應的數(shù)據(jù)記錄。

與此同時 我們還可以查看celery任務(wù)ID的狀態(tài),check_result.py寫入如下:
from celery.result import AsyncResult
from celery_object import celery_app
async_result = AsyncResult(id="d1c722fa-4ebf-432e-967e-a462bdefeac4", app=celery_app)
print("任務(wù)狀態(tài):", async_result.status)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 將結(jié)果刪除
elif async_result.failed():
print("執(zhí)行失敗")
elif async_result.status == "PENDING":
print("任務(wù)等待中被執(zhí)行")
elif async_result.status == "RETRY":
print("任務(wù)異常后正在重試")
elif async_result.status == "STARTED":
print("任務(wù)已經(jīng)開始被執(zhí)行")運行結(jié)果:
任務(wù)狀態(tài): SUCCESS
成功拿到李四0發(fā)送的短信!
到此這篇關(guān)于python中celery的基本使用詳情的文章就介紹到這了,更多相關(guān)python celery內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- python測試開發(fā)django之使用supervisord?后臺啟動celery?服務(wù)(worker/beat)
- python中使用Celery容聯(lián)云異步發(fā)送驗證碼功能
- Python中celery的使用
- python使用celery實現(xiàn)訂單超時取消
- python3中celery異步框架簡單使用+守護進程方式啟動
- Python Celery異步任務(wù)隊列使用方法解析
- python使用celery實現(xiàn)異步任務(wù)執(zhí)行的例子
- python celery分布式任務(wù)隊列的使用詳解
- Python環(huán)境下安裝使用異步任務(wù)隊列包Celery的基礎(chǔ)教程
相關(guān)文章
Pandas 對Dataframe結(jié)構(gòu)排序的實現(xiàn)方法
下面小編就為大家分享一篇Pandas 對Dataframe結(jié)構(gòu)排序的實現(xiàn)方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04

