詳解分布式任務(wù)隊(duì)列Celery使用說明
起步
Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護(hù)這樣一個(gè)系統(tǒng)的必需工具。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。
運(yùn)行模式是生產(chǎn)者消費(fèi)者模式:

任務(wù)隊(duì)列:任務(wù)隊(duì)列是一種在線程或機(jī)器間分發(fā)任務(wù)的機(jī)制。
消息隊(duì)列:消息隊(duì)列的輸入是工作的一個(gè)單元,稱為任務(wù),獨(dú)立的職程(Worker)進(jìn)程持續(xù)監(jiān)視隊(duì)列中是否有需要處理的新任務(wù)。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個(gè)過程從客戶端向隊(duì)列添加消息開始,之后中間人把消息派送給職程,職程對(duì)消息進(jìn)行處理。
Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成。
消息中間件:Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。
任務(wù)執(zhí)行單元:Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中
任務(wù)結(jié)果存儲(chǔ):Task result store用來存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲(chǔ)的,就先選用Redis來存儲(chǔ)任務(wù)執(zhí)行結(jié)果。
安裝
通過 pip 命令即可安裝:
pip install celery
本文使用 redis 做消息中間件,所以需要在安裝:
pip install redis
redis軟件也要安裝,官網(wǎng)只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。
簡(jiǎn)單的demo
為了運(yùn)行一個(gè)簡(jiǎn)單的任務(wù),從中說明 celery 的使用方式。在項(xiàng)目文件夾內(nèi)創(chuàng)建 app.py 和 tasks.py 。tasks.py 用來定義任務(wù):
# tasks.py
import time
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('my_tasks', broker=broker, backend=backend)
@app.task
def add(x, y):
print('enter task')
time.sleep(3)
return x + y
這些代碼做了什么事。 broker 指定任務(wù)隊(duì)列的消息中間件,backend 指定了任務(wù)執(zhí)行結(jié)果的存儲(chǔ)。app 就是我們創(chuàng)建的 Celery 對(duì)象。通過 app.task 修飾器將 add 函數(shù)變成一個(gè)一部的任務(wù)。
# app.py
from tasks import add
if __name__ == '__main__':
print('start task')
result = add.delay(2, 18)
print('end task')
print(result)
add.delay 函數(shù)將任務(wù)序列化發(fā)送到消息中間件。終端執(zhí)行 python app.py 可以看到輸出一個(gè)任務(wù)的唯一識(shí)別:
start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43
這個(gè)只是將任務(wù)推送到 redis,任務(wù)還沒被消費(fèi),任務(wù)會(huì)在 celery 隊(duì)列中。
開啟 celery woker 可以將任務(wù)進(jìn)行消費(fèi):
celery worker -A tasks -l info # -A 后是模塊名
A 參數(shù)指定了celery 對(duì)象的位置,l 參數(shù)指定woker的日志級(jí)別。
如果此命令在終端報(bào)錯(cuò):
File "e:\workspace\.env\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
這是win 10 在使用 Celery 4.x 的時(shí)候會(huì)有這個(gè)問題,解決方式可以是改用 Celery 3.x 版本,或者按照 Unable to run tasks under Windows 上提供的方式,該issue提供了兩種方式解決,一種是安裝 eventlet 擴(kuò)展:
pip install eventlet celery -A <mymodule> worker -l info -P eventlet
另一種方式是添加個(gè) FORKED_BY_MULTIPROCESSING = 1 的環(huán)境變量(推薦這種方式):
import os
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
如果一切順利,woker 正常啟動(dòng),就能在終端看到任務(wù)被消費(fèi)了:
[2018-11-27 13:59:27,830: INFO/MainProcess] Received task: tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: WARNING/SpawnPoolWorker-2] enter task
[2018-11-27 13:59:30,835: INFO/SpawnPoolWorker-2] Task tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20
說明我們的demo已經(jīng)成功了。
使用配置文件
在上面的demo中,是將broker和backend直接寫在代碼中的,而 Celery 還有其他配置,最好是寫出配置文件的形式,基本配置項(xiàng)有:
- CELERY_DEFAULT_QUEUE:默認(rèn)隊(duì)列
- BROKER_URL : 代理人的網(wǎng)址
- CELERY_RESULT_BACKEND:結(jié)果存儲(chǔ)地址
- CELERY_TASK_SERIALIZER:任務(wù)序列化方式
- CELERY_RESULT_SERIALIZER:任務(wù)執(zhí)行結(jié)果序列化方式
- CELERY_TASK_RESULT_EXPIRES:任務(wù)過期時(shí)間
- CELERY_ACCEPT_CONTENT:指定任務(wù)接受的內(nèi)容序列化類型(序列化),一個(gè)列表;
整理一下目錄結(jié)構(gòu),將我們的任務(wù)封裝成包:

內(nèi)容如下:
# __init__.py
import os
from celery import Celery
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
app = Celery('demo')
# 通過 Celery 實(shí)例加載配置模塊
app.config_from_object('celery_app.celery_config')
# celery_config.py
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
# UTC
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'Asia/Shanghai'
# 導(dǎo)入指定的任務(wù)模塊
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2',
)
# task1.py
import time
from celery_app import app
@app.task
def add(x, y):
print('enter task')
time.sleep(3)
return x + y
# task2.py
import time
from celery_app import app
@app.task
def mul(x, y):
print('enter task')
time.sleep(4)
return x * y
# app.py
from celery_app import task1
if __name__ == '__main__':
pass
print('start task')
result = task1.add.delay(2, 18)
print('end task')
print(result)
提交任務(wù)與啟動(dòng)worker:
$ python app.py $ celery worker -A celery_app -l info
result = task1.add.delay(2, 18) 返回的是一個(gè)任務(wù)對(duì)象,通過 delay 函數(shù)的方式可以發(fā)現(xiàn)這個(gè)過程是非阻塞的,這個(gè)任務(wù)對(duì)象有一個(gè)方法:
r.ready() # 查看任務(wù)狀態(tài),返回布爾值, 任務(wù)執(zhí)行完成, 返回 True, 否則返回 False. r.wait() # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果,很少使用; r.get(timeout=1) # 獲取任務(wù)執(zhí)行結(jié)果,可以設(shè)置等待時(shí)間 r.result # 任務(wù)執(zhí)行結(jié)果. r.state # PENDING, START, SUCCESS,任務(wù)當(dāng)前的狀態(tài) r.status # PENDING, START, SUCCESS,任務(wù)當(dāng)前的狀態(tài) r.successful # 任務(wù)成功返回true r.traceback # 如果任務(wù)拋出了一個(gè)異常,你也可以獲取原始的回溯信息
定時(shí)任務(wù)
定時(shí)任務(wù)的功能類似 crontab,可以完成每日統(tǒng)計(jì)任務(wù)等。首先我們需要配置一下 schedule,通過改造上面的配置文件,添加 CELERYBEAT_SCHEDULE 配置:
import datetime
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'task1-every-1-min': {
'task': 'celery_app.task1.add',
'schedule': datetime.timedelta(seconds=60),
'args': (2, 15),
},
'task2-once-a-day': {
'task': 'celery_app.task2.mul',
'schedule': crontab(hour=15, minute=23),
'args': (3, 6),
}
}
task 指定要執(zhí)行的任務(wù);schedule 表示計(jì)劃的時(shí)間,datetime.timedelta(seconds=60) 表示間隔一分鐘,這里其實(shí)也可以是 crontab(minute='*/1') 來替換;args 表示要傳遞的參數(shù)。
啟動(dòng) celery beat:
$ celery worker -A celery_app -l info

我們目前是用兩個(gè)窗口來執(zhí)行 woker 和 beat 。當(dāng)然也可以只使用一個(gè)窗口來運(yùn)行(僅限linux系統(tǒng)):
$ celery -B -A celery_app worker -l info
celery.task 裝飾器
@celery.task() def name(): pass
task() 方法將任務(wù)修飾成異步, name 可以顯示指定的任務(wù)名字;serializer 指定序列化的方式;bind 一個(gè)bool值,若為True,則task實(shí)例會(huì)作為第一個(gè)參數(shù)傳遞到任務(wù)方法中,可以訪問task實(shí)例的所有的屬性,即前面反序列化中那些屬性。
@task(bind=True) # 第一個(gè)參數(shù)是self,使用self.request訪問相關(guān)的屬性 def add(self, x, y): logger.info(self.request.id)
base 可以指定任務(wù)積累,可以用來定義回調(diào)函數(shù):
import celery
class MyTask(celery.Task):
# 任務(wù)失敗時(shí)執(zhí)行
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
# 任務(wù)成功時(shí)執(zhí)行
def on_success(self, retval, task_id, args, kwargs):
pass
# 任務(wù)重試時(shí)執(zhí)行
def on_retry(self, exc, task_id, args, kwargs, einfo):
pass
@task(base=MyTask)
def add(x, y):
raise KeyError()
exc:失敗時(shí)的錯(cuò)誤的類型;
task_id:任務(wù)的id;
args:任務(wù)函數(shù)的參數(shù);
kwargs:參數(shù);
einfo:失敗時(shí)的異常詳細(xì)信息;
retval:任務(wù)成功執(zhí)行的返回值;
總結(jié)
網(wǎng)上找了一份比較常用的配置文件,需要的時(shí)候可以參考下:
# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機(jī)名'
# 指定結(jié)果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任務(wù)序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
# 指定結(jié)果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任務(wù)過期時(shí)間,celery任務(wù)執(zhí)行結(jié)果的超時(shí)時(shí)間
CELERY_TASK_RESULT_EXPIRES = 60 * 20
# 指定任務(wù)接受的序列化類型.
CELERY_ACCEPT_CONTENT = ["msgpack"]
# 任務(wù)發(fā)送完成是否需要確認(rèn),這一項(xiàng)對(duì)性能有一點(diǎn)影響
CELERY_ACKS_LATE = True
# 壓縮方案選擇,可以是zlib, bzip2,默認(rèn)是發(fā)送沒有壓縮的數(shù)據(jù)
CELERY_MESSAGE_COMPRESSION = 'zlib'
# 規(guī)定完成任務(wù)的時(shí)間
CELERYD_TASK_TIME_LIMIT = 5 # 在5s內(nèi)完成任務(wù),否則執(zhí)行該任務(wù)的worker將被殺死,任務(wù)移交給父進(jìn)程
# celery worker的并發(fā)數(shù),默認(rèn)是服務(wù)器的內(nèi)核數(shù)目,也是命令行-c參數(shù)指定的數(shù)目
CELERYD_CONCURRENCY = 4
# celery worker 每次去rabbitmq預(yù)取任務(wù)的數(shù)量
CELERYD_PREFETCH_MULTIPLIER = 4
# 每個(gè)worker執(zhí)行了多少任務(wù)就會(huì)死掉,默認(rèn)是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 這是使用了django-celery默認(rèn)的數(shù)據(jù)庫調(diào)度模型,任務(wù)執(zhí)行周期都被存在你指定的orm數(shù)據(jù)庫中
# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# 設(shè)置默認(rèn)的隊(duì)列名稱,如果一個(gè)消息不符合其他的隊(duì)列就會(huì)放在默認(rèn)隊(duì)列里面,如果什么都不設(shè)置的話,數(shù)據(jù)都會(huì)發(fā)送到默認(rèn)的隊(duì)列中
CELERY_DEFAULT_QUEUE = "default"
# 設(shè)置詳細(xì)的隊(duì)列
CELERY_QUEUES = {
"default": { # 這是上面指定的默認(rèn)隊(duì)列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"topicqueue": { # 這是一個(gè)topic隊(duì)列 凡是topictest開頭的routing key都會(huì)被放到這個(gè)隊(duì)列
"routing_key": "topic.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"task_eeg": { # 設(shè)置扇形交換機(jī)
"exchange": "tasks",
"exchange_type": "fanout",
"binding_key": "tasks",
},
}
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python網(wǎng)絡(luò)爬蟲四大選擇器用法原理總結(jié)
這篇文章主要介紹了Python網(wǎng)絡(luò)爬蟲四大選擇器用法原理總結(jié),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
完美解決keras 讀取多個(gè)hdf5文件進(jìn)行訓(xùn)練的問題
這篇文章主要介紹了完美解決keras 讀取多個(gè)hdf5文件進(jìn)行訓(xùn)練的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-07-07
Python3實(shí)現(xiàn)轉(zhuǎn)換Image圖片格式
本篇文章給大家分享了Python3實(shí)現(xiàn)在線轉(zhuǎn)換Image圖片格式的功能以及相關(guān)實(shí)例代碼,有興趣的朋友參考下。2018-06-06
Pytorch中關(guān)于model.eval()的作用及分析
這篇文章主要介紹了Pytorch中關(guān)于model.eval()的作用及分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
python獲取一組數(shù)據(jù)里最大值max函數(shù)用法實(shí)例
這篇文章主要介紹了python獲取一組數(shù)據(jù)里最大值max函數(shù)用法,實(shí)例分析了max函數(shù)的使用技巧,需要的朋友可以參考下2015-05-05
python釘釘機(jī)器人運(yùn)維腳本監(jiān)控實(shí)例
今天小編就為大家分享一篇python釘釘機(jī)器人運(yùn)維腳本監(jiān)控實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-02-02
Python中Scipy庫在信號(hào)處理中的應(yīng)用詳解
信號(hào)處理作為數(shù)字信號(hào)處理領(lǐng)域的關(guān)鍵技術(shù),涵蓋了從信號(hào)獲取、傳輸、存儲(chǔ)到最終應(yīng)用的一系列處理步驟,在這篇博客中,我們將深入探討Python中Scipy庫在信號(hào)處理領(lǐng)域的應(yīng)用,需要的朋友可以參考下2023-12-12

