淺析Python如何實現Celery任務隊列系統
系統要求
Python 3.x
Redis 服務器
依賴包:
- celery==5.3.6
- redis==5.0.1
系統架構
系統主要由以下組件構成:
- 任務定義模塊 (tasks.py):包含所有可執(zhí)行的任務定義
- 主程序模塊 (main.py):負責任務的調度和監(jiān)控
- Redis 服務器:作為消息代理(Broker)和結果后端(Result Backend)
配置說明
Celery 配置
broker_url = 'redis://:123456@127.0.0.1:6379/1' result_backend = 'redis://:123456@127.0.0.1:6379/1'
主要配置項:
- 任務序列化:JSON
- 時區(qū):Asia/Shanghai
- 工作進程數:1
功能模塊
1. 基礎運算任務
add(x, y): 加法運算
multiply(x, y): 乘法運算
chain_calculation(numbers): 鏈式計算(求和、平均值、最大值、最小值)
2. 文本處理任務
process_text(text): 文本處理(大寫轉換、長度統計、單詞計數)
3. 系統監(jiān)控任務
system_monitor(): 每5秒執(zhí)行一次,監(jiān)控系統狀態(tài)
- CPU使用率
- 內存使用率
- 系統狀態(tài)
4. 報告生成任務
generate_report(): 生成實時報告
daily_report(): 每天早上9點生成日報
workday_task(): 工作日每小時執(zhí)行的任務
定時任務配置
系統包含以下定時任務:
- 系統監(jiān)控:每5秒執(zhí)行一次
- 日報生成:每天早上9點執(zhí)行
- 工作日任務:工作日(周一至周五)9:00-18:00每小時執(zhí)行
使用說明
1. 啟動系統
確保Redis服務器已啟動
啟動Celery工作進程:
celery -A tasks worker --loglevel=info
啟動Celery Beat進程(用于定時任務):
celery -A tasks beat
運行主程序:
python main.py
2. 系統監(jiān)控
主程序運行后會自動執(zhí)行以下操作:
- 實時顯示系統監(jiān)控數據
- 執(zhí)行常規(guī)任務示例
- 通過按下 Ctrl+C 可以優(yōu)雅退出程序
錯誤處理
系統實現了完整的錯誤處理機制:
- 任務執(zhí)行錯誤捕獲和日志記錄
- 優(yōu)雅的程序退出處理
- 自動重試機制
注意事項
Redis連接配置需要根據實際環(huán)境修改
確保系統時區(qū)設置正確
建議在生產環(huán)境中調整工作進程數
監(jiān)控數據目前為模擬數據,實際使用時需要替換為真實的系統監(jiān)控指標
代碼示例
任務執(zhí)行示例:
# 執(zhí)行加法任務
result = add.delay(4, 6)
print(f"任務ID: {result.id}")
if result.ready():
print(f"結果: {result.get()}")
系統監(jiān)控示例
main.py
# 執(zhí)行系統監(jiān)控
result = system_monitor.delay()
data = result.get()
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"內存使用率: {data['memory_usage']:.1f}%")
### 所有代碼:
```python
from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor
import time
import json
from datetime import datetime
import threading
import signal
import sys
from celery.result import AsyncResult
# 全局變量控制程序運行
running = True
def signal_handler(signum, frame):
"""處理退出信號"""
global running
print("\n收到退出信號,正在關閉程序...")
running = False
def monitor_system_task():
"""監(jiān)控系統任務的執(zhí)行結果"""
while running:
try:
# 執(zhí)行系統監(jiān)控任務
result = system_monitor.delay()
# 等待結果(最多等待4秒)
for _ in range(4):
if result.ready():
data = result.get()
if data:
print(f"\n系統監(jiān)控結果:")
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"內存使用率: {data['memory_usage']:.1f}%")
print(f"系統狀態(tài): {data['status']}")
print("-" * 50)
break
time.sleep(1)
# 等待剩余時間,確保大約每5秒執(zhí)行一次
time.sleep(1)
except Exception as e:
print(f"監(jiān)控任務出錯: {e}")
time.sleep(5)
def run_regular_task():
"""運行普通任務的示例"""
while running:
try:
# 執(zhí)行一些常規(guī)任務
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 執(zhí)行常規(guī)任務...")
# 1. 執(zhí)行加法任務
result = add.delay(4, 6)
print(f"加法任務ID: {result.id}")
if result.ready():
print(f"4 + 6 = {result.get()}")
# 2. 執(zhí)行文本處理
text = f"這是一條測試消息 - {datetime.now()}"
result = process_text.delay(text)
print(f"文本處理任務ID: {result.id}")
if result.ready():
print(json.dumps(result.get(), indent=2, ensure_ascii=False))
# 休眠5秒后繼續(xù)下一輪
for _ in range(5):
if not running:
break
time.sleep(1)
except Exception as e:
print(f"執(zhí)行任務時出錯: {e}")
time.sleep(5)
def main():
"""主函數"""
# 注冊信號處理器(用于優(yōu)雅退出)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print("程序啟動...")
print("提示:按 Ctrl+C 可以優(yōu)雅退出程序")
print("\n=== 主程序開始運行 ===")
print("- 系統監(jiān)控每5秒執(zhí)行一次")
print("- 常規(guī)任務每5秒執(zhí)行一次")
print("- 所有任務的執(zhí)行結果會實時顯示")
try:
# 創(chuàng)建并啟動監(jiān)控線程
monitor_thread = threading.Thread(target=monitor_system_task)
monitor_thread.daemon = True
monitor_thread.start()
# 創(chuàng)建并啟動常規(guī)任務線程
task_thread = threading.Thread(target=run_regular_task)
task_thread.daemon = True
task_thread.start()
# 主線程保持運行
while running:
time.sleep(1)
except KeyboardInterrupt:
print("\n程序正在關閉...")
finally:
print("程序已退出。")
if __name__ == "__main__":
main()
tasks.py
from celery import Celery
from celery.schedules import crontab
import time
from datetime import datetime
import random
# 創(chuàng)建 Celery 實例
app = Celery('tasks')
# 配置 Celery
app.conf.update(
broker_url='redis://:123456@127.0.0.1:6379/1',
result_backend='redis://:123456@127.0.0.1:6379/1',
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
enable_utc=True,
worker_pool_restarts=True,
worker_concurrency=1,
)
# 配置定時任務
app.conf.beat_schedule = {
# 每5秒執(zhí)行一次系統監(jiān)控
'monitor-every-5-seconds': {
'task': 'tasks.system_monitor',
'schedule': 5.0, # 每5秒執(zhí)行一次
},
# 每天早上9點執(zhí)行
'daily-morning-report': {
'task': 'tasks.daily_report',
'schedule': crontab(hour=9, minute=0),
},
# 工作日每小時執(zhí)行
'workday-hourly-task': {
'task': 'tasks.workday_task',
'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'),
}
}
@app.task
def add(x, y):
"""簡單的加法任務"""
time.sleep(1)
return x + y
@app.task
def multiply(x, y):
"""乘法運算任務"""
time.sleep(2)
return x * y
@app.task
def process_text(text):
"""文本處理任務"""
time.sleep(1)
result = {
'original': text,
'upper': text.upper(),
'length': len(text),
'words': len(text.split())
}
return result
@app.task
def generate_report():
"""生成報告任務"""
time.sleep(3)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data = {
'timestamp': current_time,
'temperature': random.uniform(20, 30),
'humidity': random.uniform(40, 80),
'status': random.choice(['正常', '警告', '錯誤'])
}
return data
@app.task
def chain_calculation(numbers):
"""鏈式計算任務"""
time.sleep(2)
result = sum(numbers)
average = result / len(numbers)
maximum = max(numbers)
minimum = min(numbers)
return {
'sum': result,
'average': average,
'max': maximum,
'min': minimum,
'count': len(numbers)
}
@app.task
def system_monitor():
"""每5秒執(zhí)行一次的系統監(jiān)控任務"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{current_time}] 每5秒執(zhí)行一次的系統監(jiān)控任務 執(zhí)行系統監(jiān)控...")
# 模擬獲取系統信息
data = {
'timestamp': current_time,
'cpu_usage': random.uniform(0, 100),
'memory_usage': random.uniform(0, 100),
'status': 'running'
}
# 打印監(jiān)控信息
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"內存使用率: {data['memory_usage']:.1f}%")
print(f"系統狀態(tài): {data['status']}")
print("-" * 50)
return data
@app.task
def daily_report():
"""每天早上9點執(zhí)行的日報任務"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{current_time}] 生成每日報告...")
report = {
'report_type': '日報',
'generated_at': current_time,
'summary': '這是一個自動生成的日報示例',
'metrics': {
'total_tasks': random.randint(100, 1000),
'completed_tasks': random.randint(50, 500),
'success_rate': random.uniform(0.8, 1.0)
}
}
print(f"報告類型: {report['report_type']}")
print(f"生成時間: {report['generated_at']}")
print(f"任務完成率: {report['metrics']['success_rate']:.1%}")
print("-" * 50)
return report
@app.task
def workday_task():
"""工作日每小時執(zhí)行的任務"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{current_time}] 執(zhí)行工作時間任務...")
data = {
'task_type': '工作時間任務',
'executed_at': current_time,
'status': random.choice(['完成', '進行中', '計劃中']),
'workload': random.randint(1, 100)
}
print(f"任務狀態(tài): {data['status']}")
print(f"工作負載: {data['workload']}%")
print("-" * 50)
return data
以上就是淺析Python如何實現Celery任務隊列系統的詳細內容,更多關于Python Celery任務隊列的資料請關注腳本之家其它相關文章!
相關文章
python使用paramiko模塊通過ssh2協議對交換機進行配置的方法
今天小編就為大家分享一篇python使用paramiko模塊通過ssh2協議對交換機進行配置的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07
pycharm打包py項目為.exe可執(zhí)行文件的兩種方式
本文主要介紹了pycharm打包py項目為.exe可執(zhí)行文件的兩種方式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-01-01
python 通過SSHTunnelForwarder隧道連接redis的方法
今天小編就為大家分享一篇python 通過SSHTunnelForwarder隧道連接redis的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-02-02

