欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

淺析Python如何實(shí)現(xiàn)Celery任務(wù)隊(duì)列系統(tǒng)

 更新時(shí)間:2025年04月02日 09:31:12   作者:老大白菜  
這篇文章主要為大家詳細(xì)介紹了一個(gè)基于 Celery 和 Redis 的分布式任務(wù)隊(duì)列系統(tǒng),用于處理異步任務(wù)和定時(shí)任務(wù),希望對(duì)大家有一定的幫助

系統(tǒng)要求

Python 3.x

Redis 服務(wù)器

依賴包:

  • celery==5.3.6
  • redis==5.0.1

系統(tǒng)架構(gòu)

系統(tǒng)主要由以下組件構(gòu)成:

  • 任務(wù)定義模塊 (tasks.py):包含所有可執(zhí)行的任務(wù)定義
  • 主程序模塊 (main.py):負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控
  • Redis 服務(wù)器:作為消息代理(Broker)和結(jié)果后端(Result Backend)

配置說明

Celery 配置

broker_url = 'redis://:123456@127.0.0.1:6379/1'
result_backend = 'redis://:123456@127.0.0.1:6379/1'

主要配置項(xiàng):

  • 任務(wù)序列化:JSON
  • 時(shí)區(qū):Asia/Shanghai
  • 工作進(jìn)程數(shù):1

功能模塊

1. 基礎(chǔ)運(yùn)算任務(wù)

add(x, y): 加法運(yùn)算

multiply(x, y): 乘法運(yùn)算

chain_calculation(numbers): 鏈?zhǔn)接?jì)算(求和、平均值、最大值、最小值)

2. 文本處理任務(wù)

process_text(text): 文本處理(大寫轉(zhuǎn)換、長度統(tǒng)計(jì)、單詞計(jì)數(shù))

3. 系統(tǒng)監(jiān)控任務(wù)

system_monitor(): 每5秒執(zhí)行一次,監(jiān)控系統(tǒng)狀態(tài)

  • CPU使用率
  • 內(nèi)存使用率
  • 系統(tǒng)狀態(tài)

4. 報(bào)告生成任務(wù)

generate_report(): 生成實(shí)時(shí)報(bào)告

daily_report(): 每天早上9點(diǎn)生成日?qǐng)?bào)

workday_task(): 工作日每小時(shí)執(zhí)行的任務(wù)

定時(shí)任務(wù)配置

系統(tǒng)包含以下定時(shí)任務(wù):

  • 系統(tǒng)監(jiān)控:每5秒執(zhí)行一次
  • 日?qǐng)?bào)生成:每天早上9點(diǎn)執(zhí)行
  • 工作日任務(wù):工作日(周一至周五)9:00-18:00每小時(shí)執(zhí)行

使用說明

1. 啟動(dòng)系統(tǒng)

確保Redis服務(wù)器已啟動(dòng)

啟動(dòng)Celery工作進(jìn)程:

celery -A tasks worker --loglevel=info

啟動(dòng)Celery Beat進(jìn)程(用于定時(shí)任務(wù)):

celery -A tasks beat

運(yùn)行主程序:

python main.py

2. 系統(tǒng)監(jiān)控

主程序運(yùn)行后會(huì)自動(dòng)執(zhí)行以下操作:

  • 實(shí)時(shí)顯示系統(tǒng)監(jiān)控?cái)?shù)據(jù)
  • 執(zhí)行常規(guī)任務(wù)示例
  • 通過按下 Ctrl+C 可以優(yōu)雅退出程序

錯(cuò)誤處理

系統(tǒng)實(shí)現(xiàn)了完整的錯(cuò)誤處理機(jī)制:

  • 任務(wù)執(zhí)行錯(cuò)誤捕獲和日志記錄
  • 優(yōu)雅的程序退出處理
  • 自動(dòng)重試機(jī)制

注意事項(xiàng)

Redis連接配置需要根據(jù)實(shí)際環(huán)境修改

確保系統(tǒng)時(shí)區(qū)設(shè)置正確

建議在生產(chǎn)環(huán)境中調(diào)整工作進(jìn)程數(shù)

監(jiān)控?cái)?shù)據(jù)目前為模擬數(shù)據(jù),實(shí)際使用時(shí)需要替換為真實(shí)的系統(tǒng)監(jiān)控指標(biāo)

代碼示例

任務(wù)執(zhí)行示例:

# 執(zhí)行加法任務(wù)
result = add.delay(4, 6)
print(f"任務(wù)ID: {result.id}")
if result.ready():
    print(f"結(jié)果: {result.get()}")

系統(tǒng)監(jiān)控示例

main.py

# 執(zhí)行系統(tǒng)監(jiān)控
result = system_monitor.delay()
data = result.get()
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"內(nèi)存使用率: {data['memory_usage']:.1f}%")
### 所有代碼:

```python
from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor
import time
import json
from datetime import datetime
import threading
import signal
import sys
from celery.result import AsyncResult

# 全局變量控制程序運(yùn)行
running = True

def signal_handler(signum, frame):
    """處理退出信號(hào)"""
    global running
    print("\n收到退出信號(hào),正在關(guān)閉程序...")
    running = False

def monitor_system_task():
    """監(jiān)控系統(tǒng)任務(wù)的執(zhí)行結(jié)果"""
    while running:
        try:
            # 執(zhí)行系統(tǒng)監(jiān)控任務(wù)
            result = system_monitor.delay()
            
            # 等待結(jié)果(最多等待4秒)
            for _ in range(4):
                if result.ready():
                    data = result.get()
                    if data:
                        print(f"\n系統(tǒng)監(jiān)控結(jié)果:")
                        print(f"CPU使用率: {data['cpu_usage']:.1f}%")
                        print(f"內(nèi)存使用率: {data['memory_usage']:.1f}%")
                        print(f"系統(tǒng)狀態(tài): {data['status']}")
                        print("-" * 50)
                    break
                time.sleep(1)
            
            # 等待剩余時(shí)間,確保大約每5秒執(zhí)行一次
            time.sleep(1)
            
        except Exception as e:
            print(f"監(jiān)控任務(wù)出錯(cuò): {e}")
            time.sleep(5)

def run_regular_task():
    """運(yùn)行普通任務(wù)的示例"""
    while running:
        try:
            # 執(zhí)行一些常規(guī)任務(wù)
            print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 執(zhí)行常規(guī)任務(wù)...")
            
            # 1. 執(zhí)行加法任務(wù)
            result = add.delay(4, 6)
            print(f"加法任務(wù)ID: {result.id}")
            if result.ready():
                print(f"4 + 6 = {result.get()}")
            
            # 2. 執(zhí)行文本處理
            text = f"這是一條測(cè)試消息 - {datetime.now()}"
            result = process_text.delay(text)
            print(f"文本處理任務(wù)ID: {result.id}")
            if result.ready():
                print(json.dumps(result.get(), indent=2, ensure_ascii=False))
            
            # 休眠5秒后繼續(xù)下一輪
            for _ in range(5):
                if not running:
                    break
                time.sleep(1)
                
        except Exception as e:
            print(f"執(zhí)行任務(wù)時(shí)出錯(cuò): {e}")
            time.sleep(5)

def main():
    """主函數(shù)"""
    # 注冊(cè)信號(hào)處理器(用于優(yōu)雅退出)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    print("程序啟動(dòng)...")
    print("提示:按 Ctrl+C 可以優(yōu)雅退出程序")
    print("\n=== 主程序開始運(yùn)行 ===")
    print("- 系統(tǒng)監(jiān)控每5秒執(zhí)行一次")
    print("- 常規(guī)任務(wù)每5秒執(zhí)行一次")
    print("- 所有任務(wù)的執(zhí)行結(jié)果會(huì)實(shí)時(shí)顯示")
    
    try:
        # 創(chuàng)建并啟動(dòng)監(jiān)控線程
        monitor_thread = threading.Thread(target=monitor_system_task)
        monitor_thread.daemon = True
        monitor_thread.start()
        
        # 創(chuàng)建并啟動(dòng)常規(guī)任務(wù)線程
        task_thread = threading.Thread(target=run_regular_task)
        task_thread.daemon = True
        task_thread.start()
        
        # 主線程保持運(yùn)行
        while running:
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("\n程序正在關(guān)閉...")
    finally:
        print("程序已退出。")

if __name__ == "__main__":
    main()

tasks.py

from celery import Celery
from celery.schedules import crontab
import time
from datetime import datetime
import random

# 創(chuàng)建 Celery 實(shí)例
app = Celery('tasks')

# 配置 Celery
app.conf.update(
    broker_url='redis://:123456@127.0.0.1:6379/1',
    result_backend='redis://:123456@127.0.0.1:6379/1',
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='Asia/Shanghai',
    enable_utc=True,
    worker_pool_restarts=True,
    worker_concurrency=1,
)

# 配置定時(shí)任務(wù)
app.conf.beat_schedule = {
    # 每5秒執(zhí)行一次系統(tǒng)監(jiān)控
    'monitor-every-5-seconds': {
        'task': 'tasks.system_monitor',
        'schedule': 5.0,  # 每5秒執(zhí)行一次
    },
    # 每天早上9點(diǎn)執(zhí)行
    'daily-morning-report': {
        'task': 'tasks.daily_report',
        'schedule': crontab(hour=9, minute=0),
    },
    # 工作日每小時(shí)執(zhí)行
    'workday-hourly-task': {
        'task': 'tasks.workday_task',
        'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'),
    }
}

@app.task
def add(x, y):
    """簡單的加法任務(wù)"""
    time.sleep(1)
    return x + y

@app.task
def multiply(x, y):
    """乘法運(yùn)算任務(wù)"""
    time.sleep(2)
    return x * y

@app.task
def process_text(text):
    """文本處理任務(wù)"""
    time.sleep(1)
    result = {
        'original': text,
        'upper': text.upper(),
        'length': len(text),
        'words': len(text.split())
    }
    return result

@app.task
def generate_report():
    """生成報(bào)告任務(wù)"""
    time.sleep(3)
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    data = {
        'timestamp': current_time,
        'temperature': random.uniform(20, 30),
        'humidity': random.uniform(40, 80),
        'status': random.choice(['正常', '警告', '錯(cuò)誤'])
    }
    return data

@app.task
def chain_calculation(numbers):
    """鏈?zhǔn)接?jì)算任務(wù)"""
    time.sleep(2)
    result = sum(numbers)
    average = result / len(numbers)
    maximum = max(numbers)
    minimum = min(numbers)
    return {
        'sum': result,
        'average': average,
        'max': maximum,
        'min': minimum,
        'count': len(numbers)
    }

@app.task
def system_monitor():
    """每5秒執(zhí)行一次的系統(tǒng)監(jiān)控任務(wù)"""

    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"\n[{current_time}] 每5秒執(zhí)行一次的系統(tǒng)監(jiān)控任務(wù) 執(zhí)行系統(tǒng)監(jiān)控...")
    
    # 模擬獲取系統(tǒng)信息
    data = {
        'timestamp': current_time,
        'cpu_usage': random.uniform(0, 100),
        'memory_usage': random.uniform(0, 100),
        'status': 'running'
    }
    
    # 打印監(jiān)控信息
    print(f"CPU使用率: {data['cpu_usage']:.1f}%")
    print(f"內(nèi)存使用率: {data['memory_usage']:.1f}%")
    print(f"系統(tǒng)狀態(tài): {data['status']}")
    print("-" * 50)
    
    return data

@app.task
def daily_report():
    """每天早上9點(diǎn)執(zhí)行的日?qǐng)?bào)任務(wù)"""
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"\n[{current_time}] 生成每日?qǐng)?bào)告...")
    
    report = {
        'report_type': '日?qǐng)?bào)',
        'generated_at': current_time,
        'summary': '這是一個(gè)自動(dòng)生成的日?qǐng)?bào)示例',
        'metrics': {
            'total_tasks': random.randint(100, 1000),
            'completed_tasks': random.randint(50, 500),
            'success_rate': random.uniform(0.8, 1.0)
        }
    }
    
    print(f"報(bào)告類型: {report['report_type']}")
    print(f"生成時(shí)間: {report['generated_at']}")
    print(f"任務(wù)完成率: {report['metrics']['success_rate']:.1%}")
    print("-" * 50)
    
    return report

@app.task
def workday_task():
    """工作日每小時(shí)執(zhí)行的任務(wù)"""
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"\n[{current_time}] 執(zhí)行工作時(shí)間任務(wù)...")
    
    data = {
        'task_type': '工作時(shí)間任務(wù)',
        'executed_at': current_time,
        'status': random.choice(['完成', '進(jìn)行中', '計(jì)劃中']),
        'workload': random.randint(1, 100)
    }
    
    print(f"任務(wù)狀態(tài): {data['status']}")
    print(f"工作負(fù)載: {data['workload']}%")
    print("-" * 50)
    
    return data

以上就是淺析Python如何實(shí)現(xiàn)Celery任務(wù)隊(duì)列系統(tǒng)的詳細(xì)內(nèi)容,更多關(guān)于Python Celery任務(wù)隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • python str字符串轉(zhuǎn)uuid實(shí)例

    python str字符串轉(zhuǎn)uuid實(shí)例

    這篇文章主要介紹了python str字符串轉(zhuǎn)uuid實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-03-03
  • Pygame Transform圖像變形的實(shí)現(xiàn)示例

    Pygame Transform圖像變形的實(shí)現(xiàn)示例

    pygame.transform 模塊允許您對(duì)加載、創(chuàng)建后的圖像進(jìn)行一系列操作,比如調(diào)整圖像大小、旋轉(zhuǎn)圖片等操作,感興趣的可以了解一下
    2021-11-11
  • PyCharm基礎(chǔ)調(diào)試的教程

    PyCharm基礎(chǔ)調(diào)試的教程

    PyCharm作為Python開發(fā)的集成環(huán)境,提供了全面的調(diào)試工具,文中通過圖文及示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-11-11
  • Django 實(shí)現(xiàn)外鍵去除自動(dòng)添加的后綴‘_id’

    Django 實(shí)現(xiàn)外鍵去除自動(dòng)添加的后綴‘_id’

    今天小編就為大家分享一篇Django 實(shí)現(xiàn)外鍵去除自動(dòng)添加的后綴‘_id’,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-11-11
  • python使用paramiko模塊通過ssh2協(xié)議對(duì)交換機(jī)進(jìn)行配置的方法

    python使用paramiko模塊通過ssh2協(xié)議對(duì)交換機(jī)進(jìn)行配置的方法

    今天小編就為大家分享一篇python使用paramiko模塊通過ssh2協(xié)議對(duì)交換機(jī)進(jìn)行配置的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-07-07
  • pycharm打包py項(xiàng)目為.exe可執(zhí)行文件的兩種方式

    pycharm打包py項(xiàng)目為.exe可執(zhí)行文件的兩種方式

    本文主要介紹了pycharm打包py項(xiàng)目為.exe可執(zhí)行文件的兩種方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01
  • pyCharm 實(shí)現(xiàn)關(guān)閉代碼檢查

    pyCharm 實(shí)現(xiàn)關(guān)閉代碼檢查

    這篇文章主要介紹了pyCharm 實(shí)現(xiàn)關(guān)閉代碼檢查,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-06-06
  • Python批量啟動(dòng)多線程代碼實(shí)例

    Python批量啟動(dòng)多線程代碼實(shí)例

    這篇文章主要介紹了python批量啟動(dòng)多線程代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-02-02
  • django反向解析URL和URL命名空間的方法

    django反向解析URL和URL命名空間的方法

    這篇文章主要介紹了django反向解析URL和URL命名空間,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-06-06
  • python 通過SSHTunnelForwarder隧道連接redis的方法

    python 通過SSHTunnelForwarder隧道連接redis的方法

    今天小編就為大家分享一篇python 通過SSHTunnelForwarder隧道連接redis的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-02-02

最新評(píng)論