詳解Python Celery和RabbitMQ實戰(zhàn)教程
前言
Celery是一個異步任務(wù)隊列。它可以用于需要異步運行的任何內(nèi)容。RabbitMQ是Celery廣泛使用的消息代理。在本這篇文章中,我將使用RabbitMQ來介紹Celery的基本概念,然后為一個小型演示項目設(shè)置Celery 。最后,設(shè)置一個Celery Web控制臺來監(jiān)視我的任務(wù)
基本概念
來!看圖說話:
Broker
Broker(RabbitMQ)負(fù)責(zé)創(chuàng)建任務(wù)隊列,根據(jù)一些路由規(guī)則將任務(wù)分派到任務(wù)隊列,然后將任務(wù)從任務(wù)隊列交付給worker
Consumer (Celery Workers)
Consumer是執(zhí)行任務(wù)的一個或多個Celery workers??梢愿鶕?jù)用例啟動許多workers
Result Backend
后端用于存儲任務(wù)的結(jié)果。但是,它不是必需的元素,如果不在設(shè)置中包含它,就無法訪問任務(wù)的結(jié)果
安裝Celery
首先,需要安裝好Celery,可以使用PyPI:
pip install celery
選擇一個Broker:RabbitMQ
為什么我們需要broker呢?這是因為Celery本身并不構(gòu)造消息隊列,所以它需要一個額外的消息傳輸來完成這項工作。這里可以將Celery看作消息代理的包裝器
實際上,也可以從幾個不同的代理中進(jìn)行選擇,比如RabbitMQ、Redis或數(shù)據(jù)庫(例如Django數(shù)據(jù)庫)
在這里使用RabbitMQ作為代理,因為它功能完整、穩(wěn)定,Celery推薦使用它。由于演示我的環(huán)境是在Mac OS中,安裝RabbitMQ使用Homebrew即可:
brew install rabbitmq #如果是Ubuntu的話使用apt-get安裝
啟動RabbitMQ
程序?qū)⒃?strong>/usr/local/sbin中安裝RabbitMQ,雖然有些系統(tǒng)可能會有所不同??梢詫⒋寺窂教砑拥江h(huán)境變量路徑,以便以后方便地使用。例如,打開shell啟動文件~/.bash_profile添加:
PATH=$PATH:/usr/local/sbin
現(xiàn)在,可以使用rabbitmq-server命令啟動我們的RabbitMQ服務(wù)器。檢查RabbitMQ服務(wù)器成功啟動,將看到類似的輸出:
為Celery配置RabbitMQ
RabbitMQ使用Celery之前,需要對RabbitMQ進(jìn)行一些配置。簡單地說,我們需要創(chuàng)建一個虛擬主機和用戶,然后設(shè)置用戶權(quán)限,以便它可以訪問虛擬主機
# 添加用戶跟密碼 $ rabbitmqctl add_user test test123 # 添加虛擬主機 $ rabbitmqctl add_vhost test_vhost # 為用戶添加標(biāo)簽 $ rabbitmqctl set_user_tags test test_tag # 設(shè)置用戶權(quán)限 $ rabbitmqctl set_permissions -p test_vhost test ".*" ".*" ".*"
敲黑板!RabbitMQ中有三種操作:配置、寫入和讀取
上面命令末尾的字符串表示用戶test將擁有所有配置、寫入和讀取權(quán)限
演示項目
現(xiàn)在讓我們創(chuàng)建一個簡單的項目來演示Celery的使用
在celery.py中添加以下代碼:
from __future__ import absolute_import from celery import Celery app = Celery('test_celery', broker='amqp://test:test123@localhost/test_vhost', backend='rpc://', include=['test_celery.tasks'])
在這里,初始化了一個名為app的Celery實例,將用于創(chuàng)建一個任務(wù)。Celery的第一個參數(shù)只是項目包的名稱,即“test_celery”。
broker參數(shù)指定代理URL,對于RabbitMQ,傳輸是amqp。
后端參數(shù)指定后端URL。Celery中的后端用于存儲任務(wù)結(jié)果。因此,如果需要在任務(wù)完成時訪問任務(wù)的結(jié)果,應(yīng)該為Celery設(shè)置一個后端。
rpc意味著將結(jié)果作為AMQP消息發(fā)送回去,這對本次演示來說是一種可接受的格式
include參數(shù)指定了在Celery工作程序啟動時要導(dǎo)入的模塊列表。我們在這里添加了tasks模塊,以便找到我們的任務(wù)。
在tasks.py這個文件中,定義了我們的任務(wù)add_longtime:
from __future__ import absolute_import from test_celery.celery import app import time @app.task def add_longtime(a, b): print 'long time task begins' # sleep 5 seconds time.sleep(5) print 'long time task finished' return a + b
可以看到,導(dǎo)入了在前面的Celery模塊中定義的應(yīng)用程序,并將其用作任務(wù)方法的裝飾器。另外注意!app.task只是一個裝飾器。此外,我們在add_longtime任務(wù)中休眠5秒,以模擬一個耗時較長的Task
在設(shè)置好Celery之后,我們需要開始運行任務(wù),它包含在runs_tasks.py:
from .tasks import add_longtime import time if __name__ == '__main__': result = add_longtime.delay(1,2) #此時,任務(wù)還未完成,它將返回False print 'Task finished? ', result.ready() print 'Task result: ', result.result # 延長到10秒以確保任務(wù)已經(jīng)完成 time.sleep(10) # 現(xiàn)在任務(wù)完成,ready方法將返回True print 'Task finished? ', result.ready() print 'Task result: ', result.result
這里,我們使用delay方法調(diào)用任務(wù)add_longtime,如果我們想異步處理任務(wù),就需要使用delay方法。此外,保存任務(wù)的結(jié)果并打印一些信息。如果任務(wù)已經(jīng)完成,ready方法將返回True,否則返回False。result屬性是任務(wù)的結(jié)果,如果任務(wù)尚未完成,則返回None。
啟動Celery
現(xiàn)在,可以使用下面的命令啟動Celery(注:在項目文件夾中運行):
celery -A test_celery worker --loglevel=info
Celery成功連接到RabbitMQ,你會看到這樣的東西:
運行任務(wù)
再項目文件中輸入以下命令運行它:
python -m test_celery.run_tasks
查看Celery控制臺,看到運行任務(wù):
[2020-05-15 17:15:21,508: INFO/MainProcess]
Received task: test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3]
[2020-05-15 17:15:21,508: WARNING/Worker-3] long time task begins
[2020-05-15 17:15:31,510: WARNING/Worker-3] long time task finished
[2020-05-15 17:15:31,512: INFO/MainProcess]
Task test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3] succeeded in 15.003732774s: 3
當(dāng)Celery收到一個任務(wù),它打印出任務(wù)名稱與任務(wù)id(在括號中):
Received task: test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5]
在這一行下面是我們的任務(wù)add_longtime打印的兩行,時間延遲為5秒:
long time task begins long time task finished
最后一行顯示我們的任務(wù)在5秒內(nèi)完成,任務(wù)結(jié)果為3:
Task test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5] succeeded in 5.025242167s: 3
在當(dāng)前控制臺中,您將看到以下輸出:
實時監(jiān)控Celery
Flower是一款基于網(wǎng)絡(luò)的Celery實時監(jiān)控軟件。使用Flower,可以輕松地監(jiān)視任務(wù)進(jìn)度和歷史記錄
使用pip來安裝Flower:
pip install flower
要啟動Flower web控制臺,需要運行以下命令:
celery -A test_celery flower
Flower將運行具有默認(rèn)端口5555的服務(wù)器,可以通過http://localhost:5555訪問web控制臺
到此這篇關(guān)于詳解Python Celery和RabbitMQ實戰(zhàn)教程的文章就介紹到這了,更多相關(guān)Python Celery和RabbitMQ實戰(zhàn)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python?管理系統(tǒng)實現(xiàn)mysql交互的示例代碼
這篇文章主要介紹了python?管理系統(tǒng)實現(xiàn)mysql交互,本文通過實例代碼圖文相結(jié)合給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-12-12Django重裝mysql后啟動報錯:No module named ‘MySQLdb’的解決方法
這篇文章主要給大家介紹了關(guān)于Django重裝mysql后啟動報錯:No module named ‘MySQLdb’的解決方法,分享出來,對同樣遇到這個問題的朋友們一個參考學(xué)習(xí),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-04-04numpy中np.append()函數(shù)用法小結(jié)
在numpy的函數(shù)庫中,np.append()函數(shù)是一個常用的數(shù)組操作函數(shù),它在進(jìn)行數(shù)組操作時能夠?qū)蓚€數(shù)組進(jìn)行拼接,并返回一個拼接后的新數(shù)組,下面就來介紹一下具體用法,感興趣的可以了解一下2023-11-11Python sklearn中的K-Means聚類使用方法淺析
這篇文章主要介紹了Python sklearn中的K-Means聚類使用方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2022-12-12pyqt實現(xiàn).ui文件批量轉(zhuǎn)換為對應(yīng).py文件腳本
今天小編就為大家分享一篇pyqt實現(xiàn).ui文件批量轉(zhuǎn)換為對應(yīng).py文件腳本,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06