django+celery+RabbitMQ自定義多個消息隊列的實現(xiàn)
關(guān)于django celery的使用網(wǎng)上有很多文章,本文就不多做更多的說明。
本文使用版本
- python==3.8.15
- Django==3.2.4
- celery==5.2.7
celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery from kombu import Exchange, Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zkcelery.settings') app = Celery('zkcelery') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() # 看了一篇文章說,如果使用redis做broker,exchange可以不配置;但如果使用rabbitMQ做broker,就必須要配置。 queue = ( Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'), Queue('q1', exchange=Exchange('e1', type='direct'), routing_key='r1'), Queue('q2', exchange=Exchange('e2', type='direct'), routing_key='r2'), Queue('q3', exchange=Exchange('e3', type='fanout'), routing_key='r3'), ) # 一旦配置了route后,所有的任務(wù)名都必須要指定route,否則任務(wù)無法執(zhí)行。 # 經(jīng)過測試,route匹配是最長匹配規(guī)則。 route = { 'apps.zhiding.tasks.add': {'queue': 'q1', 'routing_key': 'r1'}, 'apps.zhiding.tasks.multiply': {'queue': 'q2', 'routing_key': 'r2'}, # 其它的任務(wù)名稱,匹配這條路由 # 如果以上隊列的worker服務(wù)器壞了,這些任務(wù)會被全部放進這個隊列里,該隊列的worker將繼續(xù)處理這些任務(wù) # 下面這條隊列一定要配置,否則其它任務(wù)無法處理。 '*': {'queue': 'default', 'routing_key': 'default'}, } app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)
tasks.py
from celery import shared_task import time @shared_task def add(x, y): time.sleep(2) print('任務(wù)睡眠2秒后執(zhí)行了') return x + y @shared_task def multiply(x, y): time.sleep(5) print('任務(wù)睡眠5秒后執(zhí)行了') return x * y @shared_task def sub(x, y): time.sleep(4) print('任務(wù)睡眠4秒后執(zhí)行了') return x - y
筆者也看了很多博文,在settings.py配置文件中寫入CELERY_QUEUES和CELERY_ROUTES,上面的配置對應(yīng)下來就是如下代碼塊:
CELERY_QUEUES = ( Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'), Queue('sq1', exchange=Exchange('sq1', type='direct'), routing_key='sq1'), Queue('sq2', exchange=Exchange('sq2', type='direct'), routing_key='sq2'), Queue('sq3', exchange=Exchange('sq3', type='fanout'), routing_key='sq3'), ) CELERY_ROUTES = { 'apps.zhiding.tasks.add': {'queue': 'sq1', 'routing_key': 'sq1'}, 'apps.zhiding.tasks.multiply': {'queue': 'sq2', 'routing_key': 'sq2'}, '*': {'queue': 'default', 'routing_key': 'default'}, }
但是筆者在實際使用中發(fā)現(xiàn)后面這種方式配置始終未生效,不知道是不是筆者版本的不同,沒有做更多的研究,如果你能找到問題的原因,歡迎評論交流。
啟動worker
# 筆者使用的windows,啟動時需要加上-P eventlet celery -A zkcelery worker -l info -P eventlet
啟動后隊列中出現(xiàn)配置中的個隊列
同時會在rabbitmq中創(chuàng)建(如果不存在)4個隊列,交換機和相應(yīng)的綁定關(guān)系(當然也可以直接通過rabbitmq管理端直接創(chuàng)建自己需要的隊列、交換機和綁定,具體根據(jù)個人習(xí)慣或者視工作場景而定選擇)
以隊列q1示例:
暫時先關(guān)閉worker,便于觀察消息隊列中的消息。
向隊列中發(fā)送幾條消息,消息均進入到配置中指定的queue中
再次啟動worker,隊列中的消息立馬被消費
如何做到消費指定的隊列中的消息,只需要啟動的時候加上參數(shù)Q
# -Q指定消費的隊列 # -n 指定worker節(jié)點的名稱,避免啟動多個時的重名沖突 celery -A zkcelery worker -l info -Q q1 -n node1 -P eventlet
可以看到終端中queues只有q1了
q1中的消息被消費掉了,其他隊列沒有變化
也可以同時指定多個消費隊列
celery -A zkcelery worker -l info -Q q2,default -n node2 -P eventlet
當然也可以在生產(chǎn)方指定推送的隊列,舉例如下:
到此這篇關(guān)于django+celery+RabbitMQ自定義多個消息隊列的實現(xiàn)的文章就介紹到這了,更多相關(guān)django celery RabbitMQ消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
將tensorflow.Variable中的某些元素取出組成一個新的矩陣示例
今天小編就為大家分享一篇將tensorflow.Variable中的某些元素取出組成一個新的矩陣示例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01使用python實現(xiàn)學(xué)生信息管理系統(tǒng)
這篇文章主要為大家詳細介紹了使用python實現(xiàn)學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-02-02PyCharm使用教程之搭建Python開發(fā)環(huán)境
由于python的跨平臺性。在windows下和ubuntu下基本上沒什么差別。下面從幾個不步驟來搭建開發(fā)環(huán)境。2016-06-06Tensorflow模型實現(xiàn)預(yù)測或識別單張圖片
這篇文章主要為大家詳細介紹了Tensorflow模型實現(xiàn)預(yù)測或識別單張圖片,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-07-07pytorch中部分矩陣乘法和數(shù)組乘法的小結(jié)
本文主要介紹了pytorch中部分矩陣乘法和數(shù)組乘法的小結(jié),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03python中Tkinter詳細基礎(chǔ)教學(xué)實例代碼
這篇文章主要給大家介紹了關(guān)于python中Tkinter詳細基礎(chǔ)教學(xué)的相關(guān)資料,文中介紹了如Label、Button、Entry、Text、Frame、Menu、Canvas、Messagebox等的基本屬性和用法,并介紹了布局管理器pack、grid和place的使用方法,需要的朋友可以參考下2024-12-12Python使用add_subplot與subplot畫子圖操作示例
這篇文章主要介紹了Python使用add_subplot與subplot畫子圖操作,涉及Python使用matplotlib模塊進行圖形繪制的相關(guān)操作技巧,需要的朋友可以參考下2018-06-06Pandas數(shù)據(jù)分析之批量拆分/合并Excel
怎樣將一個大的Excel拆分,或者將很多小Excel文件合并?下面這篇文章主要給大家介紹了關(guān)于Pandas數(shù)據(jù)分析之批量拆分/合并Excel的相關(guān)資料,需要的朋友可以參考下2021-09-09