淺析Python如何實(shí)現(xiàn)Celery任務(wù)隊(duì)列系統(tǒng)
系統(tǒng)要求
Python 3.x
Redis 服務(wù)器
依賴包:
- celery==5.3.6
- redis==5.0.1
系統(tǒng)架構(gòu)
系統(tǒng)主要由以下組件構(gòu)成:
- 任務(wù)定義模塊 (tasks.py):包含所有可執(zhí)行的任務(wù)定義
- 主程序模塊 (main.py):負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控
- Redis 服務(wù)器:作為消息代理(Broker)和結(jié)果后端(Result Backend)
配置說明
Celery 配置
broker_url = 'redis://:123456@127.0.0.1:6379/1' result_backend = 'redis://:123456@127.0.0.1:6379/1'
主要配置項(xiàng):
- 任務(wù)序列化:JSON
- 時(shí)區(qū):Asia/Shanghai
- 工作進(jìn)程數(shù):1
功能模塊
1. 基礎(chǔ)運(yùn)算任務(wù)
add(x, y): 加法運(yùn)算
multiply(x, y): 乘法運(yùn)算
chain_calculation(numbers): 鏈?zhǔn)接?jì)算(求和、平均值、最大值、最小值)
2. 文本處理任務(wù)
process_text(text): 文本處理(大寫轉(zhuǎn)換、長度統(tǒng)計(jì)、單詞計(jì)數(shù))
3. 系統(tǒng)監(jiān)控任務(wù)
system_monitor(): 每5秒執(zhí)行一次,監(jiān)控系統(tǒng)狀態(tài)
- CPU使用率
- 內(nèi)存使用率
- 系統(tǒng)狀態(tài)
4. 報(bào)告生成任務(wù)
generate_report(): 生成實(shí)時(shí)報(bào)告
daily_report(): 每天早上9點(diǎn)生成日?qǐng)?bào)
workday_task(): 工作日每小時(shí)執(zhí)行的任務(wù)
定時(shí)任務(wù)配置
系統(tǒng)包含以下定時(shí)任務(wù):
- 系統(tǒng)監(jiān)控:每5秒執(zhí)行一次
- 日?qǐng)?bào)生成:每天早上9點(diǎn)執(zhí)行
- 工作日任務(wù):工作日(周一至周五)9:00-18:00每小時(shí)執(zhí)行
使用說明
1. 啟動(dòng)系統(tǒng)
確保Redis服務(wù)器已啟動(dòng)
啟動(dòng)Celery工作進(jìn)程:
celery -A tasks worker --loglevel=info
啟動(dòng)Celery Beat進(jìn)程(用于定時(shí)任務(wù)):
celery -A tasks beat
運(yùn)行主程序:
python main.py
2. 系統(tǒng)監(jiān)控
主程序運(yùn)行后會(huì)自動(dòng)執(zhí)行以下操作:
- 實(shí)時(shí)顯示系統(tǒng)監(jiān)控?cái)?shù)據(jù)
- 執(zhí)行常規(guī)任務(wù)示例
- 通過按下 Ctrl+C 可以優(yōu)雅退出程序
錯(cuò)誤處理
系統(tǒng)實(shí)現(xiàn)了完整的錯(cuò)誤處理機(jī)制:
- 任務(wù)執(zhí)行錯(cuò)誤捕獲和日志記錄
- 優(yōu)雅的程序退出處理
- 自動(dòng)重試機(jī)制
注意事項(xiàng)
Redis連接配置需要根據(jù)實(shí)際環(huán)境修改
確保系統(tǒng)時(shí)區(qū)設(shè)置正確
建議在生產(chǎn)環(huán)境中調(diào)整工作進(jìn)程數(shù)
監(jiān)控?cái)?shù)據(jù)目前為模擬數(shù)據(jù),實(shí)際使用時(shí)需要替換為真實(shí)的系統(tǒng)監(jiān)控指標(biāo)
代碼示例
任務(wù)執(zhí)行示例:
# 執(zhí)行加法任務(wù) result = add.delay(4, 6) print(f"任務(wù)ID: {result.id}") if result.ready(): print(f"結(jié)果: {result.get()}")
系統(tǒng)監(jiān)控示例
main.py
# 執(zhí)行系統(tǒng)監(jiān)控 result = system_monitor.delay() data = result.get() print(f"CPU使用率: {data['cpu_usage']:.1f}%") print(f"內(nèi)存使用率: {data['memory_usage']:.1f}%") ### 所有代碼: ```python from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor import time import json from datetime import datetime import threading import signal import sys from celery.result import AsyncResult # 全局變量控制程序運(yùn)行 running = True def signal_handler(signum, frame): """處理退出信號(hào)""" global running print("\n收到退出信號(hào),正在關(guān)閉程序...") running = False def monitor_system_task(): """監(jiān)控系統(tǒng)任務(wù)的執(zhí)行結(jié)果""" while running: try: # 執(zhí)行系統(tǒng)監(jiān)控任務(wù) result = system_monitor.delay() # 等待結(jié)果(最多等待4秒) for _ in range(4): if result.ready(): data = result.get() if data: print(f"\n系統(tǒng)監(jiān)控結(jié)果:") print(f"CPU使用率: {data['cpu_usage']:.1f}%") print(f"內(nèi)存使用率: {data['memory_usage']:.1f}%") print(f"系統(tǒng)狀態(tài): {data['status']}") print("-" * 50) break time.sleep(1) # 等待剩余時(shí)間,確保大約每5秒執(zhí)行一次 time.sleep(1) except Exception as e: print(f"監(jiān)控任務(wù)出錯(cuò): {e}") time.sleep(5) def run_regular_task(): """運(yùn)行普通任務(wù)的示例""" while running: try: # 執(zhí)行一些常規(guī)任務(wù) print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 執(zhí)行常規(guī)任務(wù)...") # 1. 執(zhí)行加法任務(wù) result = add.delay(4, 6) print(f"加法任務(wù)ID: {result.id}") if result.ready(): print(f"4 + 6 = {result.get()}") # 2. 執(zhí)行文本處理 text = f"這是一條測(cè)試消息 - {datetime.now()}" result = process_text.delay(text) print(f"文本處理任務(wù)ID: {result.id}") if result.ready(): print(json.dumps(result.get(), indent=2, ensure_ascii=False)) # 休眠5秒后繼續(xù)下一輪 for _ in range(5): if not running: break time.sleep(1) except Exception as e: print(f"執(zhí)行任務(wù)時(shí)出錯(cuò): {e}") time.sleep(5) def main(): """主函數(shù)""" # 注冊(cè)信號(hào)處理器(用于優(yōu)雅退出) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print("程序啟動(dòng)...") print("提示:按 Ctrl+C 可以優(yōu)雅退出程序") print("\n=== 主程序開始運(yùn)行 ===") print("- 系統(tǒng)監(jiān)控每5秒執(zhí)行一次") print("- 常規(guī)任務(wù)每5秒執(zhí)行一次") print("- 所有任務(wù)的執(zhí)行結(jié)果會(huì)實(shí)時(shí)顯示") try: # 創(chuàng)建并啟動(dòng)監(jiān)控線程 monitor_thread = threading.Thread(target=monitor_system_task) monitor_thread.daemon = True monitor_thread.start() # 創(chuàng)建并啟動(dòng)常規(guī)任務(wù)線程 task_thread = threading.Thread(target=run_regular_task) task_thread.daemon = True task_thread.start() # 主線程保持運(yùn)行 while running: time.sleep(1) except KeyboardInterrupt: print("\n程序正在關(guān)閉...") finally: print("程序已退出。") if __name__ == "__main__": main()
tasks.py
from celery import Celery from celery.schedules import crontab import time from datetime import datetime import random # 創(chuàng)建 Celery 實(shí)例 app = Celery('tasks') # 配置 Celery app.conf.update( broker_url='redis://:123456@127.0.0.1:6379/1', result_backend='redis://:123456@127.0.0.1:6379/1', task_serializer='json', result_serializer='json', accept_content=['json'], timezone='Asia/Shanghai', enable_utc=True, worker_pool_restarts=True, worker_concurrency=1, ) # 配置定時(shí)任務(wù) app.conf.beat_schedule = { # 每5秒執(zhí)行一次系統(tǒng)監(jiān)控 'monitor-every-5-seconds': { 'task': 'tasks.system_monitor', 'schedule': 5.0, # 每5秒執(zhí)行一次 }, # 每天早上9點(diǎn)執(zhí)行 'daily-morning-report': { 'task': 'tasks.daily_report', 'schedule': crontab(hour=9, minute=0), }, # 工作日每小時(shí)執(zhí)行 'workday-hourly-task': { 'task': 'tasks.workday_task', 'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'), } } @app.task def add(x, y): """簡單的加法任務(wù)""" time.sleep(1) return x + y @app.task def multiply(x, y): """乘法運(yùn)算任務(wù)""" time.sleep(2) return x * y @app.task def process_text(text): """文本處理任務(wù)""" time.sleep(1) result = { 'original': text, 'upper': text.upper(), 'length': len(text), 'words': len(text.split()) } return result @app.task def generate_report(): """生成報(bào)告任務(wù)""" time.sleep(3) current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") data = { 'timestamp': current_time, 'temperature': random.uniform(20, 30), 'humidity': random.uniform(40, 80), 'status': random.choice(['正常', '警告', '錯(cuò)誤']) } return data @app.task def chain_calculation(numbers): """鏈?zhǔn)接?jì)算任務(wù)""" time.sleep(2) result = sum(numbers) average = result / len(numbers) maximum = max(numbers) minimum = min(numbers) return { 'sum': result, 'average': average, 'max': maximum, 'min': minimum, 'count': len(numbers) } @app.task def system_monitor(): """每5秒執(zhí)行一次的系統(tǒng)監(jiān)控任務(wù)""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] 每5秒執(zhí)行一次的系統(tǒng)監(jiān)控任務(wù) 執(zhí)行系統(tǒng)監(jiān)控...") # 模擬獲取系統(tǒng)信息 data = { 'timestamp': current_time, 'cpu_usage': random.uniform(0, 100), 'memory_usage': random.uniform(0, 100), 'status': 'running' } # 打印監(jiān)控信息 print(f"CPU使用率: {data['cpu_usage']:.1f}%") print(f"內(nèi)存使用率: {data['memory_usage']:.1f}%") print(f"系統(tǒng)狀態(tài): {data['status']}") print("-" * 50) return data @app.task def daily_report(): """每天早上9點(diǎn)執(zhí)行的日?qǐng)?bào)任務(wù)""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] 生成每日?qǐng)?bào)告...") report = { 'report_type': '日?qǐng)?bào)', 'generated_at': current_time, 'summary': '這是一個(gè)自動(dòng)生成的日?qǐng)?bào)示例', 'metrics': { 'total_tasks': random.randint(100, 1000), 'completed_tasks': random.randint(50, 500), 'success_rate': random.uniform(0.8, 1.0) } } print(f"報(bào)告類型: {report['report_type']}") print(f"生成時(shí)間: {report['generated_at']}") print(f"任務(wù)完成率: {report['metrics']['success_rate']:.1%}") print("-" * 50) return report @app.task def workday_task(): """工作日每小時(shí)執(zhí)行的任務(wù)""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n[{current_time}] 執(zhí)行工作時(shí)間任務(wù)...") data = { 'task_type': '工作時(shí)間任務(wù)', 'executed_at': current_time, 'status': random.choice(['完成', '進(jìn)行中', '計(jì)劃中']), 'workload': random.randint(1, 100) } print(f"任務(wù)狀態(tài): {data['status']}") print(f"工作負(fù)載: {data['workload']}%") print("-" * 50) return data
以上就是淺析Python如何實(shí)現(xiàn)Celery任務(wù)隊(duì)列系統(tǒng)的詳細(xì)內(nèi)容,更多關(guān)于Python Celery任務(wù)隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
python str字符串轉(zhuǎn)uuid實(shí)例
這篇文章主要介紹了python str字符串轉(zhuǎn)uuid實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-03-03Pygame Transform圖像變形的實(shí)現(xiàn)示例
pygame.transform 模塊允許您對(duì)加載、創(chuàng)建后的圖像進(jìn)行一系列操作,比如調(diào)整圖像大小、旋轉(zhuǎn)圖片等操作,感興趣的可以了解一下2021-11-11Django 實(shí)現(xiàn)外鍵去除自動(dòng)添加的后綴‘_id’
今天小編就為大家分享一篇Django 實(shí)現(xiàn)外鍵去除自動(dòng)添加的后綴‘_id’,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-11-11python使用paramiko模塊通過ssh2協(xié)議對(duì)交換機(jī)進(jìn)行配置的方法
今天小編就為大家分享一篇python使用paramiko模塊通過ssh2協(xié)議對(duì)交換機(jī)進(jìn)行配置的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-07-07pycharm打包py項(xiàng)目為.exe可執(zhí)行文件的兩種方式
本文主要介紹了pycharm打包py項(xiàng)目為.exe可執(zhí)行文件的兩種方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01pyCharm 實(shí)現(xiàn)關(guān)閉代碼檢查
這篇文章主要介紹了pyCharm 實(shí)現(xiàn)關(guān)閉代碼檢查,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-06-06python 通過SSHTunnelForwarder隧道連接redis的方法
今天小編就為大家分享一篇python 通過SSHTunnelForwarder隧道連接redis的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-02-02