Python FastAPI+Celery+RabbitMQ實(shí)現(xiàn)分布式圖片水印處理系統(tǒng)
實(shí)現(xiàn)思路
- FastAPI 服務(wù)器
- Celery 任務(wù)隊(duì)列
- RabbitMQ 作為消息代理
- 定時(shí)任務(wù)處理
完整步驟
首先創(chuàng)建項(xiàng)目結(jié)構(gòu):
c:\Users\Administrator\Desktop\meitu\
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ ├── tasks.py
│ └── config.py
├── requirements.txt
└── celery_worker.py
1.首先創(chuàng)建 requirements.txt:
fastapi==0.104.1 uvicorn==0.24.0 celery==5.3.4 python-dotenv==1.0.0 requests==2.31.0
2.創(chuàng)建配置文件:
from dotenv import load_dotenv import os load_dotenv() # RabbitMQ配置 RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost") RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672") RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest") RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest") # Celery配置 CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//" CELERY_RESULT_BACKEND = "rpc://" # 定時(shí)任務(wù)配置 CELERY_BEAT_SCHEDULE = { 'process-images-every-hour': { 'task': 'app.tasks.process_images', 'schedule': 3600.0, # 每小時(shí)執(zhí)行一次 }, 'daily-cleanup': { 'task': 'app.tasks.cleanup_old_images', 'schedule': 86400.0, # 每天執(zhí)行一次 } }
3.創(chuàng)建 Celery 應(yīng)用:
from celery import Celery from app.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULE celery_app = Celery( 'image_processing', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND, include=['app.tasks'] ) # 配置定時(shí)任務(wù) celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE celery_app.conf.timezone = 'Asia/Shanghai'
4.創(chuàng)建任務(wù)文件:
from app.celery_app import celery_app from app.watermark import ImageWatermarker import os from datetime import datetime, timedelta @celery_app.task def add_watermark_task(image_path, text, position='center'): """異步添加水印任務(wù)""" watermarker = ImageWatermarker() try: result_path = watermarker.add_watermark( image_path=image_path, text=text, position=position ) return {"status": "success", "output_path": result_path} except Exception as e: return {"status": "error", "message": str(e)} @celery_app.task def process_images(): """定時(shí)處理圖片任務(wù)""" image_dir = "images/pending" if not os.path.exists(image_dir): return {"status": "error", "message": "Pending directory not found"} processed = 0 for image in os.listdir(image_dir): if image.lower().endswith(('.png', '.jpg', '.jpeg')): add_watermark_task.delay( os.path.join(image_dir, image), "自動(dòng)處理水印", 'center' ) processed += 1 return {"status": "success", "processed": processed} @celery_app.task def cleanup_old_images(): """清理舊圖片任務(wù)""" output_dir = "images/processed" if not os.path.exists(output_dir): return {"status": "error", "message": "Output directory not found"} threshold_date = datetime.now() - timedelta(days=7) cleaned = 0 for image in os.listdir(output_dir): image_path = os.path.join(output_dir, image) if os.path.getctime(image_path) < threshold_date.timestamp(): os.remove(image_path) cleaned += 1 return {"status": "success", "cleaned": cleaned}
5.創(chuàng)建 FastAPI 應(yīng)用:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks from fastapi.responses import JSONResponse import os from app.tasks import add_watermark_task from app.celery_app import celery_app app = FastAPI(title="圖片水印處理服務(wù)") @app.post("/upload/") async def upload_image( file: UploadFile = File(...), text: str = "水印文本", position: str = "center" ): # 保存上傳的文件 file_path = f"images/uploads/{file.filename}" os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb") as buffer: content = await file.read() buffer.write(content) # 創(chuàng)建異步任務(wù) task = add_watermark_task.delay(file_path, text, position) return JSONResponse({ "status": "success", "message": "圖片已上傳并加入處理隊(duì)列", "task_id": task.id }) @app.get("/task/{task_id}") async def get_task_status(task_id: str): task = celery_app.AsyncResult(task_id) if task.ready(): return {"status": "completed", "result": task.result} return {"status": "processing"} @app.get("/tasks/scheduled") async def get_scheduled_tasks(): return {"tasks": celery_app.conf.beat_schedule}
6.創(chuàng)建 Celery worker 啟動(dòng)文件:
from app.celery_app import celery_app if __name__ == '__main__': celery_app.start()
使用說明
首先安裝依賴:
pip install -r requirements.txt
確保 RabbitMQ 服務(wù)已啟動(dòng)
啟動(dòng) FastAPI 服務(wù)器:
uvicorn app.main:app --reload
啟動(dòng) Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
啟動(dòng) Celery beat(定時(shí)任務(wù)):
celery -A celery_worker.celery_app beat --loglevel=info
這個(gè)系統(tǒng)提供以下功能:
- 通過 FastAPI 接口上傳圖片并異步處理水印
- 使用 Celery 處理異步任務(wù)隊(duì)列
- 使用 RabbitMQ 作為消息代理
- 支持定時(shí)任務(wù):
- 每小時(shí)自動(dòng)處理待處理圖片
- 每天清理一周前的舊圖片
- 支持任務(wù)狀態(tài)查詢
- 支持查看計(jì)劃任務(wù)列表
API 端點(diǎn):
- POST /upload/ - 上傳圖片并創(chuàng)建水印任務(wù)
- GET /task/{task_id} - 查詢?nèi)蝿?wù)狀態(tài)
- GET /tasks/scheduled - 查看計(jì)劃任務(wù)列表
以上就是Python FastAPI+Celery+RabbitMQ實(shí)現(xiàn)分布式圖片水印處理系統(tǒng)的詳細(xì)內(nèi)容,更多關(guān)于Python圖片水印的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于pytorch 預(yù)訓(xùn)練的詞向量用法詳解
今天小編就為大家分享一篇基于pytorch 預(yù)訓(xùn)練的詞向量用法詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-01-01100 個(gè) Python 小例子(練習(xí)題三)
這篇文章主要給大家分享的是100 個(gè) Python 小例子,前期已經(jīng)給大家分過100個(gè)小例子的(一)和(二),今天小編繼續(xù)和大家分享(三),希望歲正在學(xué)習(xí)的你有所幫助2022-01-01python實(shí)現(xiàn)微秒級(jí)等待問題(windows)
這篇文章主要介紹了python實(shí)現(xiàn)微秒級(jí)等待問題(windows),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06Python腳本實(shí)現(xiàn)蝦米網(wǎng)簽到功能
這篇文章主要介紹了Python腳本實(shí)現(xiàn)蝦米網(wǎng)簽到功能的方法,涉及Python調(diào)用URL模塊實(shí)現(xiàn)數(shù)據(jù)傳輸與處理的相關(guān)技巧,需要的朋友可以參考下2016-04-04Python自制一個(gè)PDF轉(zhuǎn)PNG圖片小工具
這篇文章主要為大家詳細(xì)介紹了如何利用Python中的PyQt5自制一個(gè)PDF轉(zhuǎn)PNG格式圖片的小工具,文中的示例代碼講解詳細(xì),感興趣的可以了解一下2023-02-02Python使用Matplotlib模塊時(shí)坐標(biāo)軸標(biāo)題中文及各種特殊符號(hào)顯示方法
這篇文章主要介紹了Python使用Matplotlib模塊時(shí)坐標(biāo)軸標(biāo)題中文及各種特殊符號(hào)顯示方法,結(jié)合具體實(shí)例分析了Python使用Matplotlib模塊過程中針對(duì)中文及特殊符號(hào)的顯示方法,需要的朋友可以參考下2018-05-05