欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python FastAPI+Celery+RabbitMQ實(shí)現(xiàn)分布式圖片水印處理系統(tǒng)

 更新時(shí)間:2025年04月04日 09:23:59   作者:老大白菜  
這篇文章主要為大家詳細(xì)介紹了Python FastAPI如何結(jié)合Celery以及RabbitMQ實(shí)現(xiàn)簡單的分布式圖片水印處理系統(tǒng),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

實(shí)現(xiàn)思路

  • FastAPI 服務(wù)器
  • Celery 任務(wù)隊(duì)列
  • RabbitMQ 作為消息代理
  • 定時(shí)任務(wù)處理

完整步驟

首先創(chuàng)建項(xiàng)目結(jié)構(gòu):

c:\Users\Administrator\Desktop\meitu\
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_app.py
│   ├── tasks.py
│   └── config.py
├── requirements.txt
└── celery_worker.py

1.首先創(chuàng)建 requirements.txt:

fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0

2.創(chuàng)建配置文件:

from dotenv import load_dotenv
import os

load_dotenv()

# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")

# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"

# 定時(shí)任務(wù)配置
CELERY_BEAT_SCHEDULE = {
    'process-images-every-hour': {
        'task': 'app.tasks.process_images',
        'schedule': 3600.0,  # 每小時(shí)執(zhí)行一次
    },
    'daily-cleanup': {
        'task': 'app.tasks.cleanup_old_images',
        'schedule': 86400.0,  # 每天執(zhí)行一次
    }
}

3.創(chuàng)建 Celery 應(yīng)用:

from celery import Celery
from app.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULE

celery_app = Celery(
    'image_processing',
    broker=CELERY_BROKER_URL,
    backend=CELERY_RESULT_BACKEND,
    include=['app.tasks']
)

# 配置定時(shí)任務(wù)
celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app.conf.timezone = 'Asia/Shanghai'

4.創(chuàng)建任務(wù)文件:

from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta

@celery_app.task
def add_watermark_task(image_path, text, position='center'):
    """異步添加水印任務(wù)"""
    watermarker = ImageWatermarker()
    try:
        result_path = watermarker.add_watermark(
            image_path=image_path,
            text=text,
            position=position
        )
        return {"status": "success", "output_path": result_path}
    except Exception as e:
        return {"status": "error", "message": str(e)}

@celery_app.task
def process_images():
    """定時(shí)處理圖片任務(wù)"""
    image_dir = "images/pending"
    if not os.path.exists(image_dir):
        return {"status": "error", "message": "Pending directory not found"}

    processed = 0
    for image in os.listdir(image_dir):
        if image.lower().endswith(('.png', '.jpg', '.jpeg')):
            add_watermark_task.delay(
                os.path.join(image_dir, image),
                "自動(dòng)處理水印",
                'center'
            )
            processed += 1

    return {"status": "success", "processed": processed}

@celery_app.task
def cleanup_old_images():
    """清理舊圖片任務(wù)"""
    output_dir = "images/processed"
    if not os.path.exists(output_dir):
        return {"status": "error", "message": "Output directory not found"}

    threshold_date = datetime.now() - timedelta(days=7)
    cleaned = 0

    for image in os.listdir(output_dir):
        image_path = os.path.join(output_dir, image)
        if os.path.getctime(image_path) < threshold_date.timestamp():
            os.remove(image_path)
            cleaned += 1

    return {"status": "success", "cleaned": cleaned}

5.創(chuàng)建 FastAPI 應(yīng)用:

from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import JSONResponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_app

app = FastAPI(title="圖片水印處理服務(wù)")

@app.post("/upload/")
async def upload_image(
    file: UploadFile = File(...),
    text: str = "水印文本",
    position: str = "center"
):
    # 保存上傳的文件
    file_path = f"images/uploads/{file.filename}"
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    with open(file_path, "wb") as buffer:
        content = await file.read()
        buffer.write(content)
    
    # 創(chuàng)建異步任務(wù)
    task = add_watermark_task.delay(file_path, text, position)
    
    return JSONResponse({
        "status": "success",
        "message": "圖片已上傳并加入處理隊(duì)列",
        "task_id": task.id
    })

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    task = celery_app.AsyncResult(task_id)
    if task.ready():
        return {"status": "completed", "result": task.result}
    return {"status": "processing"}

@app.get("/tasks/scheduled")
async def get_scheduled_tasks():
    return {"tasks": celery_app.conf.beat_schedule}

6.創(chuàng)建 Celery worker 啟動(dòng)文件:

from app.celery_app import celery_app

if __name__ == '__main__':
    celery_app.start()

使用說明

首先安裝依賴:

pip install -r requirements.txt

確保 RabbitMQ 服務(wù)已啟動(dòng)

啟動(dòng) FastAPI 服務(wù)器:

uvicorn app.main:app --reload

啟動(dòng) Celery worker:

celery -A celery_worker.celery_app worker --loglevel=info

啟動(dòng) Celery beat(定時(shí)任務(wù)):

celery -A celery_worker.celery_app beat --loglevel=info

這個(gè)系統(tǒng)提供以下功能:

  • 通過 FastAPI 接口上傳圖片并異步處理水印
  • 使用 Celery 處理異步任務(wù)隊(duì)列
  • 使用 RabbitMQ 作為消息代理
  • 支持定時(shí)任務(wù):
    • 每小時(shí)自動(dòng)處理待處理圖片
    • 每天清理一周前的舊圖片
  • 支持任務(wù)狀態(tài)查詢
  • 支持查看計(jì)劃任務(wù)列表

API 端點(diǎn):

  • POST /upload/ - 上傳圖片并創(chuàng)建水印任務(wù)
  • GET /task/{task_id} - 查詢?nèi)蝿?wù)狀態(tài)
  • GET /tasks/scheduled - 查看計(jì)劃任務(wù)列表

以上就是Python FastAPI+Celery+RabbitMQ實(shí)現(xiàn)分布式圖片水印處理系統(tǒng)的詳細(xì)內(nèi)容,更多關(guān)于Python圖片水印的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 基于pytorch 預(yù)訓(xùn)練的詞向量用法詳解

    基于pytorch 預(yù)訓(xùn)練的詞向量用法詳解

    今天小編就為大家分享一篇基于pytorch 預(yù)訓(xùn)練的詞向量用法詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-01-01
  • 詳解Python中的Dict(下篇)

    詳解Python中的Dict(下篇)

    這篇文章主要為大家介紹了Python中的Dict,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2021-12-12
  • 100 個(gè) Python 小例子(練習(xí)題三)

    100 個(gè) Python 小例子(練習(xí)題三)

    這篇文章主要給大家分享的是100 個(gè) Python 小例子,前期已經(jīng)給大家分過100個(gè)小例子的(一)和(二),今天小編繼續(xù)和大家分享(三),希望歲正在學(xué)習(xí)的你有所幫助
    2022-01-01
  • python實(shí)現(xiàn)微秒級(jí)等待問題(windows)

    python實(shí)現(xiàn)微秒級(jí)等待問題(windows)

    這篇文章主要介紹了python實(shí)現(xiàn)微秒級(jí)等待問題(windows),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-06-06
  • python入門前的第一課 python怎樣入門

    python入門前的第一課 python怎樣入門

    人工智能這么火,0基礎(chǔ)能學(xué)python嗎?python該怎么選擇編輯器?怎么搭建python運(yùn)行環(huán)境?python好學(xué)嗎,怎么學(xué)?這是所有python入門前同學(xué)都會(huì)提出的疑問,這篇文章和大家一起學(xué)習(xí)python,感興趣的小伙伴們可以加入
    2018-03-03
  • Python腳本實(shí)現(xiàn)蝦米網(wǎng)簽到功能

    Python腳本實(shí)現(xiàn)蝦米網(wǎng)簽到功能

    這篇文章主要介紹了Python腳本實(shí)現(xiàn)蝦米網(wǎng)簽到功能的方法,涉及Python調(diào)用URL模塊實(shí)現(xiàn)數(shù)據(jù)傳輸與處理的相關(guān)技巧,需要的朋友可以參考下
    2016-04-04
  • Python自制一個(gè)PDF轉(zhuǎn)PNG圖片小工具

    Python自制一個(gè)PDF轉(zhuǎn)PNG圖片小工具

    這篇文章主要為大家詳細(xì)介紹了如何利用Python中的PyQt5自制一個(gè)PDF轉(zhuǎn)PNG格式圖片的小工具,文中的示例代碼講解詳細(xì),感興趣的可以了解一下
    2023-02-02
  • 快速了解Python相對(duì)導(dǎo)入

    快速了解Python相對(duì)導(dǎo)入

    這篇文章主要介紹了快速了解Python相對(duì)導(dǎo)入,具有一定借鑒價(jià)值,需要的朋友可以參考下
    2018-01-01
  • Python使用Matplotlib模塊時(shí)坐標(biāo)軸標(biāo)題中文及各種特殊符號(hào)顯示方法

    Python使用Matplotlib模塊時(shí)坐標(biāo)軸標(biāo)題中文及各種特殊符號(hào)顯示方法

    這篇文章主要介紹了Python使用Matplotlib模塊時(shí)坐標(biāo)軸標(biāo)題中文及各種特殊符號(hào)顯示方法,結(jié)合具體實(shí)例分析了Python使用Matplotlib模塊過程中針對(duì)中文及特殊符號(hào)的顯示方法,需要的朋友可以參考下
    2018-05-05
  • Python 移動(dòng)光標(biāo)位置的方法

    Python 移動(dòng)光標(biāo)位置的方法

    今天小編就為大家分享一篇Python 移動(dòng)光標(biāo)位置的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-01-01

最新評(píng)論