Python使用fastapi快速編寫一個增刪改查的接口
python用fastapi快速寫一個增刪改查的接口
from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Dict app = FastAPI() # Mock database db = {} # Model for the data class Item(BaseModel): name: str description: str # Create operation @app.post("/items/") def create_item(item: Item): if item.name in db: raise HTTPException(status_code=400, detail="Item already exists") db[item.name] = item.description return {"message": "Item created successfully"} # Read operation @app.get("/items/{name}") def read_item(name: str): if name not in db: raise HTTPException(status_code=404, detail="Item not found") return {"name": name, "description": db[name]} # Update operation @app.put("/items/{name}") def update_item(name: str, item: Item): if name not in db: raise HTTPException(status_code=404, detail="Item not found") db[name] = item.description return {"message": "Item updated successfully"} # Delete operation @app.delete("/items/{name}") def delete_item(name: str): if name not in db: raise HTTPException(status_code=404, detail="Item not found") del db[name] return {"message": "Item deleted successfully"}
這段代碼設(shè)置了一個FastAPI應(yīng)用程序,其中包含用于創(chuàng)建、讀取、更新和刪除物品的端點(diǎn)。數(shù)據(jù)以簡單的內(nèi)存數(shù)據(jù)庫形式存儲在字典(db)中。您可以使用諸如curl、Postman或任何其他HTTP客戶端之類的工具來測試這些端點(diǎn)。
方法補(bǔ)充
除了上文的方法,小編還為大家整理了其他FastAPI實(shí)現(xiàn)高效的增刪改查操作的方法,希望對大家有所幫助
環(huán)境搭建
在開始之前,請確保你的環(huán)境中安裝了Python 3.6+和pip。使用以下命令安裝FastAPI和Uvicorn,Uvicorn是一個輕量級的ASGI服務(wù)器。
pip install fastapi uvicorn
快速啟動
創(chuàng)建一個main.py文件,并寫入以下代碼來啟動一個簡單的FastAPI應(yīng)用:
from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
運(yùn)行服務(wù)器:
uvicorn main:app --reload
打開瀏覽器訪問http://127.0.0.1:8000/,你將看到返回的JSON響應(yīng)。
定義數(shù)據(jù)模型
在實(shí)現(xiàn)CRUD操作之前,需要定義數(shù)據(jù)模型。在models.py文件中,定義一個Pydantic模型,用于請求和響應(yīng)數(shù)據(jù)的驗(yàn)證。
from pydantic import BaseModel # 創(chuàng)建一個Item模型 class Item(BaseModel): id: int name: str description: str = None price: float tax: float = None
創(chuàng)建CRUD操作
在crud.py文件中,實(shí)現(xiàn)CRUD操作的函數(shù)。這里簡單地使用一個字典來存儲數(shù)據(jù),實(shí)際應(yīng)用中應(yīng)該使用數(shù)據(jù)庫。
from models import Item items = {} def create_item(item_id: int, item: Item): items[item_id] = item return items[item_id] def read_item(item_id: int): return items.get(item_id) def update_item(item_id: int, item: Item): if item_id in items: items[item_id] = item return items[item_id] return None def delete_item(item_id: int): if item_id in items: del items[item_id] return True return False
實(shí)現(xiàn)API端點(diǎn)
在main.py中,將使用CRUD操作函數(shù)來實(shí)現(xiàn)API端點(diǎn)。
from fastapi import FastAPI, HTTPException from models import Item from crud import create_item, read_item, update_item, delete_item app = FastAPI() # 創(chuàng)建Item @app.post("/items/{item_id}") async def create(item_id: int, item: Item): return create_item(item_id, item) # 讀取Item @app.get("/items/{item_id}") async def read(item_id: int): item = read_item(item_id) if item is None: raise HTTPException(status_code=404, detail="Item not found") return item # 更新Item @app.put("/items/{item_id}") async def update(item_id: int, item: Item): updated_item = update_item(item_id, item) if updated_item is None: raise HTTPException(status_code=404, detail="Item not found") return updated_item # 刪除Item @app.delete("/items/{item_id}") async def delete(item_id: int): if not delete_item(item_id): raise HTTPException(status_code=404, detail="Item not found") return {"detail": "Item deleted"}
測試API
我們可以使用FastAPI提供的API文檔來測試我們的接口。重新運(yùn)行服務(wù)器:
uvicorn main:app --reload
轉(zhuǎn)到http://127.0.0.1:8000/docs,你將看到Swagger UI,一個自動生成的交互式API文檔,你可以在這里測試你的API端點(diǎn)。
進(jìn)階:使用數(shù)據(jù)庫
為了讓CRUD操作更加實(shí)際,我們將使用SQLite數(shù)據(jù)庫來存儲數(shù)據(jù)。首先,我們需要安裝databases和sqlalchemy。
pip install databases sqlalchemy
接下來,在database.py文件中設(shè)置數(shù)據(jù)庫連接和表:
import databases import sqlalchemy from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float DATABASE_URL = "sqlite:///./test.db" database = databases.Database(DATABASE_URL) metadata = MetaData() items = Table( "items", metadata, Column("id", Integer, primary_key=True), Column("name", String(50)), Column("description", String(50)), Column("price", Float), Column("tax", Float, default=None) ) engine = create_engine(DATABASE_URL) metadata.create_all(engine)
在crud.py中,我們將函數(shù)更新為使用數(shù)據(jù)庫:
from models import Item from database import database, items async def create_item(item_id: int, item: Item): query = items.insert().values(id=item_id, **item.dict()) last_record_id = await database.execute(query) return {**item.dict(), "id": last_record_id} async def read_item(item_id: int): query = items.select().where(items.c.id == item_id) return await database.fetch_one(query) async def update_item(item_id: int, item: Item): query = items.update().where(items.c.id == item_id).values(**item.dict()) await database.execute(query) return await read_item(item_id) async def delete_item(item_id: int): query = items.delete().where(items.c.id == item_id) return await database.execute(query) > 0
最后,需要在main.py
中啟動和關(guān)閉數(shù)據(jù)庫連接:
from fastapi import FastAPI, HTTPException from models import Item from crud import create_item, read_item, update_item, delete_item from database import database app = FastAPI() @app.on_event("startup") async def startup(): await database.connect() @app.on_event("shutdown") async def shutdown(): await database.disconnect() # 其余的API端點(diǎn)保持不變
到此這篇關(guān)于Python使用fastapi快速編寫一個增刪改查的接口的文章就介紹到這了,更多相關(guān)Python fastapi增刪改查內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python 實(shí)現(xiàn)定時任務(wù)的四種方式
這篇文章主要介紹了python 實(shí)現(xiàn)定時任務(wù)的四種方式,幫助大家更好的理解和學(xué)習(xí)使用python,感興趣的朋友可以了解下2021-04-04python自動12306搶票軟件實(shí)現(xiàn)代碼
這篇文章主要為大家詳細(xì)介紹了python自動12306搶票軟件的實(shí)現(xiàn)代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02Python常見數(shù)據(jù)結(jié)構(gòu)之棧與隊列用法示例
這篇文章主要介紹了Python常見數(shù)據(jù)結(jié)構(gòu)之棧與隊列用法,結(jié)合實(shí)例形式簡單介紹了數(shù)據(jù)結(jié)構(gòu)中棧與隊列的概念、功能及簡單使用技巧,需要的朋友可以參考下2019-01-01Python實(shí)現(xiàn)基本數(shù)據(jù)結(jié)構(gòu)中棧的操作示例
這篇文章主要介紹了Python實(shí)現(xiàn)基本數(shù)據(jù)結(jié)構(gòu)中棧的操作,包括基于Python實(shí)現(xiàn)棧的定義、入棧、出棧、判斷??栈驐M等情況,需要的朋友可以參考下2017-12-12使用Python對IP進(jìn)行轉(zhuǎn)換的一些操作技巧小結(jié)
這篇文章主要介紹了使用Python對IP進(jìn)行轉(zhuǎn)換的一些操作技巧小結(jié),包括使用socket模塊里的相關(guān)函數(shù)和匿名函數(shù)實(shí)現(xiàn),需要的朋友可以參考下2015-11-11Python實(shí)現(xiàn)字符串格式化的方法小結(jié)
本篇文章主要介紹了Python實(shí)現(xiàn)字符串格式化的方法小結(jié),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02