文章

FastAPI 开发网络数据接口

FastAPI 是一个现代的、高性能的 Web 框架,用于快速构建 API 应用。它基于 Python 的标准类型提示,并通过异步特性和数据验证功能,显著提升开发体验和效率。

关键特性:

  • 快速:可与 NodeJS 和 Go 并肩的极高性能(归功于 Starlette 和 Pydantic)。最快的 Python web 框架之一
  • 高效编码:提高功能开发速度约 200% 至 300%。*
  • 更少 bug:减少约 40% 的人为(开发者)导致错误。*
  • 智能:极佳的编辑器支持。处处皆可自动补全,减少调试时间。
  • 简单:设计的易于使用和学习,阅读文档的时间更短。
  • 简短:使代码重复最小化。通过不同的参数声明实现丰富功能。bug 更少。
  • 健壮:生产可用级别的代码。还有自动生成的交互式文档。
  • 标准化:基于(并完全兼容)API 的相关开放标准:OpenAPI (以前被称为 Swagger) 和 JSON Schema

FastAPI 构建在两个关键库之上,分别负责不同的核心功能:

  • Starlette:专注于 Web 应用部分,提供了 ASGI 框架基础,提供高性能的 HTTP 请求处理。
  • Pydantic:专注于数据验证和解析,基于 Python 类型提示进行模型验证和数据转换。

FastAPI 快速上手

创建项目

创建一个项目名称为 fastAPI。

安装三方库

pip install fastapi

在 FastAPI 中,虽然框架本身提供了强大的路由、数据验证和 HTTP 请求处理功能,但仍然需要一个 ASGI(异步服务器网关接口)服务器来运行应用程序。在生产环境中可以使用 Uvicorn 或者 Hypercorn

pip install "uvicorn[standard]"

示例

创建应用

使用 FastAPI 创建一个简单的 Web API 应用。

# fastAPI/test/main.py

from typing import Union

from fastapi import FastAPI

app = FastAPI()  # ASGI 应用,可以用 Uvicorn 等服务器运行。


@app.get("/")  # 定义一个处理根路径 `/` 的 GET 请求的处理函数。
def read_root():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
    return {"item_id": item_id, "q": q}

说明:

  • 从 Python 的 typing 模块中导入 Union,可以定义是多种类型的参数。
  • 从 FastAPI 框架中导入 FastAPI 类,用于创建 FastAPI 应用实例。
  • q: Union[str, None] = Noneq 是一个查询参数,可以是字符串或 None 类型,默认为 None

运行 FastAPI 应用

通过以下命令运行服务器:

(venv) PS D:\PycharmProjects\fastAPI> uvicorn test.main:app --reload   
INFO:     Will watch for changes in these directories: ['D:\\PycharmProjects\\fastAPI']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [18812] using WatchFiles              
INFO:     Started server process [18344]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     127.0.0.1:59193 - "GET / HTTP/1.1" 200 OK

说明:运行 unicorn 时指定从参数--reload,在代码发生变更时自动重新加载代码内容。

访问服务接口

  1. 根路径:访问 http://127.0.0.1:8000/,返回响应结果:
{"Hello": "World"}
  1. 动态路径:访问 http://127.0.0.1:8000/items/42?q=example,将返回响应结果:
{"item_id": 42, "q": "example"}

查看文档

访问交互式 API 文档

可以访问 http://127.0.0.1:8000/docs,自动生成的交互式 API 文档(由 Swagger UI生成)。

Swagger UI 是一个交互式的 API 文档工具,允许用户在浏览 API 文档的同时,直接在界面上发送请求并查看响应。

填写请求参数,执行返回响应结果:

Pasted image 20240624075252.png

可选的 API 文档

访问 http://127.0.0.1:8000/redoc。可以看到另一个自动生成的文档(由 ReDoc 生成)。

ReDoc 生成的 API 文档主要用于查看和浏览,不支持直接编辑请求体和发送请求。

Pasted image 20240624081247.png

示例升级

修改 main.py 文件从 PUT 请求中接收请求体。

借助 Pydantic 来使用标准的 Python 类型声明请求体。

# test/main.py

from typing import Union
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


@app.get("/")
def read_root():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
    return {"item_id": item_id, "q": q}


class Itme(BaseModel):
    name: str
    price: float
    is_offer: Union[bool, None] = None


@app.put("/items/{item_id}")
def update_item(item_id: int, item: Itme):
    return {"item_name": item.name, "item_id": item_id}

保存代码,服务器会自动更新加载。

查看 API 文档

访问 http://127.0.0.1:8000/docs

交互式 API 文档将自动更新,并加入新的请求体:

Pasted image 20240624095531.png

点击 Try it out 按钮,可以填写参数并直接调用 API:

Pasted image 20240624095854.png

然后点击 Execute 按钮,用户界面将会和 API 进行通信,返回响应:

Pasted image 20240624100005.png

并发 async / await

有关 async def 语法以及异步代码、并发和并行的一些基础知识。

在 Python 中,如果你使用了某些需要异步通信的三方库(例如处理 HTTP 请求或数据库操作),而你的应用程序在这些任务完成之前无需阻塞等待响应,那么可以将相关函数定义为异步函数(使用 async def),并在调用需要等待的操作时使用 await 关键字。

可以通过 async def 声明的异步函数:

@app.get('/')
async def read_results():
    results = await some_library()
    return results

但有时候与某些组件(如数据库、API、文件系统等)进行通信时,这些组件不支持使用 await,用普通方式 def 声明函数即可。

异步代码

现代版本的 Python 有一种非常直观的方式来定义异步代码,可以使用 asyncawait 语法。使用 aysnc def 定义函数,Python 就知道在该函数中,它遇上 await,可以暂停执行该函数,直至执行其他操作后回来。

结合 asyncawait 使用的异步代码,所有功能大多数被概括为协程[^协程]。它与函数相似,可以启动、执行并返回结果。但不同的是,协程可以在执行过程中暂停,等待异步操作完成后再继续执行。这种暂停和恢复的能力使其特别适合处理 I/O 密集型任务,如网络请求或文件操作。

[^协程]:在 Python 语言中,单线程 + 异步 I/O 的编程模型称为协程

同步方式是指按顺序执行任务,一个任务执行完后再执行下一个任务。这种方式下,每个任务的执行都是阻塞的。

异步方式允许任务在等待的过程中继续执行其他任务,而不是阻塞当前任务。这种方式可以更高效地利用资源,特别是在 I/O 密集型操作(如网络请求、文件读写)中。

这种异步代码的编写思想有时也称之为并发,它不同于并行

并发和并行都或多或少与不同的事情同时发生有关。但它们又有细节性差异。

并行指的是系统在同一时间点真正地同时执行多个任务。这需要多个处理器核心或多个计算单元,允许多个任务在不同的核心上同时执行。

  • 多核处理器:在多核 CPU 或多处理器系统中,每个核心或处理器同时执行不同的任务。
  • 分布式计算:任务在不同的物理机器上同时执行。

并发是系统在同一时间段内处理多个任务。注意,并发并不意味着这些事同时执行的,而是任务在时间上交替进行,使其看起来是同时进行。这通常通过任务之间的快速切换实现。

  • 多任务切换:在单个处理器上,任务交替执行,每个任务执行一小段时间,然后切换到另一个任务。
  • 异步操作:任务可以在等待 I/O 操作时进行切换,从而更有效地利用 CPU 资源。

路由处理函数

在 FastAPI 中,路由处理函数既可以使用 def(同步函数)也可以使用 async def(异步函数)来定义。

  • 同步函数(def:当处理逻辑包含阻塞操作(如同步数据库查询或文件读取)时,FastAPI 会自动将它们放入线程池中执行,以避免阻塞主事件循环。
  • 异步函数(async def:推荐在使用支持异步 I/O 的库时使用异步函数,如异步数据库访问库(asyncpg)或网络请求库(httpx)。异步函数能与主事件循环无缝协作,高效处理并发请求。

函数处理方式

FastAPI 处理方式:

  • 如果通过 def 声明的函数,FastAPI 会自动将其放到线程池中运行。这是为了避免阻塞主事件循环。也就是说,虽然在代码中它表现为直接调用,但底层实际通过线程池异步处理。
  • 如果通过 async def 声明函数,则它会直接在主事件循环中运行,可以利用异步能力。

依赖

这同样适用于依赖。如果一个依赖是标准的 def 函数而不是 async def,它将被运行在外部线程池中。

请求体

FastAPI 使用请求体从客户端(例如浏览器)向 API 发送数据。

请求体是客户端发送给 API 的数据。响应体是 API 发送给客户端的数据。

API 基本上肯定要发送响应体,但是客户端不一定发送请求体

说明:

  • 尽管规范允许 FastAPI 支持通过 GET 方法携带请求体,但这种用法不推荐,且在交互式文档(Swagger UI)中会隐藏。建议遵循规范,将数据通过 URL 参数或使用其他 HTTP 方法传递。
  • 推荐使用场景:
    • 发送数据:使用 POSTPUTPATCH
    • 获取数据:使用 GET,并通过查询参数(query parameters)传递附加信息。

创建数据模型

数据模型声明继承 BaseModel 类,BaseModel来自 Pydantic 库,用于定义可以验证数据和执行序列化的模型。

class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None

例如,上述模型声明如下 JSON 对象(即 Python 字典):

{
    "name": "Foo",
    "description": "An optional description",
    "price": 45.2,
    "tax": 3.5
}

声明请求体参数

在 FastAPI 中,你可以使用 Pydantic 模型以与声明路径和查询参数相同的方式声明请求主体。

@app.post("/items/")
async def create_item(item: Item):
    return {"name": item.name, "price": item.price}
  • item: Item 是请求主体,其中 Item 是一个 Pydantic 模型,它将自动解析并验证传入的 JSON 主体。
  • 请求主体的处理方式与路径和查询参数相同,但 FastAPI 使用 Pydantic 模型进行结构化验证和序列化。

使用模型

路由处理函数内部直接访问模型对象的属性。

请求体+路径参数

FastAPI 支持同时声明路径参数和请求体。

FastAPI 能识别与路径参数匹配的函数参数,还能识别从请求体中获取的类型为 Pydantic 模型的函数参数。

@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
    return {"item_id": item_id, **item.dict()}

请求体+路径参数+查询参数

FastAPI 支持同时声明请求体、路径参数和查询参数

FastAPI 根据以下规则自动识别处理它们:

  • 路径参数:出现在 URL 路径部分的参数(例如/items/{item_id})。这些被视为路径参数
  • 请求主体:参数是 Pydantic 模型或类,表示客户端发送的请求主体数据。
  • 查询参数:URL 查询字符串中的参数(例如/items?q=value)。这些参数被视为查询参数,其类型可以是 intfloatstrbool 等单类型的参数。

FastAPI 能够识别这三种参数,并从正确的位置获取数据。

@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, q: Union[str, None] = None):
    return {"item_id": item_id, "name": item.name, "price": item.price, "q": q}

此示例中:

  • item_id: int 是一个路径参数(因为它是 URL 路径的一部分)。
  • item: Item请求主体,预期为与 Pydantic 模型匹配的 JSON 对象 Item
  • q: Union[str, None] = None 是一个查询参数,将作为 URL 的一部分传递,如/items/{item_id}?q=value。(它是用默认值声明的,并且不是主体或路径的一部分)。

说明:默认值为 None 的参数,FastAPI 会将其当作可选参数。

响应模型

可以在任意的路由请求中使用 response_model 参数来声明用于响应的模型:

  • @app.get()
  • @app.post()
  • @app.put()
  • @app.delete()
  • 等等。
from typing import Any, List, Union

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None
    tags: List[str] = []


@app.post("/items/", response_model=Item)
async def create_item(item: Item) -> Any:
    return item


@app.get("/items/", response_model=List[Item])
async def read_items() -> Any:
    return [
        {"name": "Portal Gun", "price": 42.0},
        {"name": "Plumbus", "price": 32.0},
    ]

说明:

  • response_model 是装饰器方法(getpost 等)的一个参数。不像之前的所有参数和请求体,它不属于路由处理函数。
  • response_model 接收的类型与在 Pydantic 模型属性所声明的类型相同,因此它可以是一个 Pydantic 模型,或由 Pydantic 模型组成的列表如 List[Item]
  • -> Any 指示函数的返回值类型。当返回类型不确定或多种可能时,使用 -> Any 可以让函数返回不同类型的数据,具有灵活性。

FastAPI 将使用 response_model 用于:

  • 将输出数据转换为其声明的类型,限制在该模型定义中。
  • 使用响应模型校验数据。
  • 在 OpenAPI 的路由处理中为响应添加一个 JSON Schema,在自动生成文档系统中使用。

响应模型在参数中被声明,而不是作为函数返回类型的注解,这是因为路径函数可能不会真正返回该响应模型,而是返回一个 dict、数据库对象或其他模型,然后再使用 response_model 来执行字段约束和序列化。

返回与输入相同的数据

声明一个 UserIn 模型,它将包含一个明文密码属性。

from typing import Union

from fastapi import FastAPI
from pydantic import BaseModel, EmailStr

app = FastAPI()


class UserIn(BaseModel):
    username: str
    password: str
    email: EmailStr
    full_name: Union[str, None] = None


# Don't do this in production!
@app.post("/user/")
async def create_user(user: UserIn) -> UserIn:
    return user

说明:

  • async def create_user(user: UserIn) -> UserIn:使用 UserIn 模型声明输入数据,并使用同一模型声明输出数据。

当浏览器使用一个密码创建用户时,API 会在响应中返回相同的密码。显然,在响应中发送密码有泄露的风险。

因此,定义不同于输入模型的输出模型。

添加输出模型

现在创建一个有明文密码的输入模型和一个没有明文密码的输出模型。

# test/main.py

from typing import Any, Union

from fastapi import FastAPI
from pydantic import BaseModel, EmailStr

app = FastAPI()


class UserIn(BaseModel):
    username: str
    password: str
    email: EmailStr
    full_name: Union[str, None] = None


class UserOut(BaseModel):
    username: str
    email: EmailStr
    full_name: Union[str, None] = None

@app.post("/user/", response_model=UserOut)
async def create_user(user: UserIn) -> Any:
    return user  #  路由处理函数将返回包含密码的相同输入用户

说明:将 response_model 声明为不包含密码的 UserOut 模型,FastAPI 将会过滤掉未在输出模型中声明的所有数据(使用 Pydantic)。

查看文档

当查看自动化文档时,可以检查输入模型和输出模型是否都具有自己的 JSON Schema:

运行服务:

uvicorn test.main:app --reload

访问 http://127.0.0.1:8000/docs ,检查输入和输出模型 JSON Schema:

Pasted image 20240625071435.png

响应模型编码参数

当在 NoSQL 数据库中使用了具有许多可选属性的模型,返回 JSON 响应时又不想包含默认值字段。可以通过 response_model_exclude_unset 参数实现,有助于减少响应体并使返回的数据更加简洁。

使用 response_model_exclude_unset 参数

在路由函数的装饰器上设置参数 response_model_exclude_unset=True。如果响应模型中没有存储实际的值,可以忽略那些默认值,仅返回显式设定的值。

from typing import List, Union

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: float = 10.5
    tags: List[str] = []


items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
    "baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}


@app.get("/items/{item_id}", response_model=Item, response_model_exclude_unset=True)
async def read_item(item_id: str):
    return items[item_id]

说明:

  • description: Union[str, None] = None 具有默认值 None
  • tax: float = 10.5 具有默认值 10.5.
  • tags: List[str] = [] 具有一个空列表 [] 作为默认值。

例如,向路由处理函数发送 namefoo 的请求,则响应(将不包括默认值)为:

{
    "name": "Foo",
    "price": 50.2
}

说明:FastAPI 通过 Pydantic 模型的 .dict() 配合该方法的 exclude_unset 参数 来实现此功能。

还可以使用:

  • response_model_exclude_defaults=True
  • response_model_exclude_none=True

参考 Pydantic 文档 中对 exclude_defaults 和 exclude_none 的描述。

具有与默认值相同值的字段,它们的值被显式地设定,而非取自默认值。

例如,向路由处理函数发送 namebaz 的请求,则响应为:

{
    "name": "Baz",
    "description": None,
    "price": 50.2,
    "tax": 10.5,
    "tags": []
}

注意:默认值可以是任何值,不仅仅是 None。它可以是一个列表 [],一个 float 类型的值等等。

使用 response_model_include 和 response_model_exclude

如果只有一个 Pydantic 模型,并且向从输出中移除一些数据,可以使用 response_model_include 或 response_model_exclude 来包含或排除一些属性。

在路由处理函数的装饰器上使用 response_model_include 和 response_model_exclude。它们接收一个由属性名称 str 组成的 set 来包含或排除一些属性。

from typing import Union

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Item(BaseModel):
    name: str
    description: Union[str, None] = None
    price: float
    tax: float = 10.5


items = {
    "foo": {"name": "Foo", "price": 50.2},
    "bar": {"name": "Bar", "description": "The Bar fighters", "price": 62, "tax": 20.2},
    "baz": {
        "name": "Baz",
        "description": "There goes my baz",
        "price": 50.2,
        "tax": 10.5,
    },
}


@app.get(
    "/items/{item_id}/name",
    response_model=Item,
    response_model_include={"name", "description"},
)
async def read_item_name(item_id: str):
    return items[item_id]


@app.get("/items/{item_id}/public", response_model=Item, response_model_exclude={"tax"})
async def read_item_public_data(item_id: str):
    return items[item_id]

说明:

  • {"name", "description"} 创建一个具有两个值的 set 集合,等同于 set(["name", "description"])

使用 list 而不是 set

如果不使用 set 而是 list 或者 tuple,FastAPI 仍会将其转换为 set 进行工作。

@app.get(
    "/items/{item_id}/name",
    response_model=Item,
    response_model_include=["name", "description"],
)
async def read_item_name(item_id: str):
    return items[item_id]


@app.get("/items/{item_id}/public", response_model=Item, response_model_exclude=["tax"])
async def read_item_public_data(item_id: str):
    return items[item_id]

虽然 response_model_includeresponse_model_exclude 可以用来控制在响应中包含或排除某些属性,但在某些情况下,使用多个 Pydantic 类来分割模型可能更合适。

  • 避免完整模型暴露:即使使用 response_model_includeresponse_model_exclude 来包含或排除某些属性,FastAPI 的 OpenAPI 定义和生成的 JSON Schema 仍然会基于完整的模型。这意味着,客户端在查看 API 文档时,仍然可以看到模型的所有字段。
  • 清晰的数据结构:通过使用多个 Pydantic 类,可以更明确地定义哪些数据会在 API 响应中公开。这样可以提高代码的可维护性和可识别性。

更多模型

参考 https://fastapi.tiangolo.com/zh/tutorial/extra-models/

依赖注入

依赖注入是一种软件设计模式,用于通过声明的方式将所需依赖项提供给代码运行。FastAPI 使用内置的依赖注入系统,让开发者能够简洁地定义依赖关系,而无需显式地手动实例化对象或调用函数。

依赖注入的常用场景:

  • 共享业务逻辑,复用代码
  • 共享数据库连接
  • 实现安全、验证、角色权限等等

依赖注入的基本工作机制如下介绍。

创建依赖项

依赖项是一个函数,且可以使用与路由处理函数相同的参数:

from typing import Union

async def common_parameters(
    q: Union[str, None] = None, skip: int = 0, limit: int = 100
):
    return {"q": q, "skip": skip, "limit": limit}

说明:

  • async def common_parameters() 依赖项函数的形式和结构与路由处理函数一样。可以把依赖项当作么有装饰器(即没有 @aap.get("/some-path"))的路由处理函数。依赖项可以返回各种内容。
  • 本例中的依赖项预期接收如下参数:
    • 类型为 str 的可选查询参数 q
    • 类型为 int 的可选查询参数 skip,默认值是 0
    • 类型为 int 的可选查询参数 limit,默认值是 100
    • 依赖项返回包含这些值的 dict

声明依赖项

与在路由处理函数的参数中使用 BodyQuery 的方式相同,声明依赖项需要使用 Depends

Depends 是 FastAPI 的核心依赖注入工具,它可以接受一个可调用对象(通常是函数),并在路由处理函数之前处理这些依赖项。

from fastapi import Depends, FastAPI

app = FastAPI()

@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons


@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
    return commons

FastAPI 的依赖注入流程:

  • 调用依赖项函数:FastAPI 根据依赖函数的定义和传入参数,解析所需的依赖并调用依赖项函数。
  • 返回结果注入:依赖项函数的返回值会作为参数传递给路径操作函数。
  • 支持嵌套依赖:一个依赖项函数可以依赖另一个依赖项函数,FastAPI 会递归解析这些嵌套依赖并确保它们正确执行。

是否使用 async

FastAPI 调用依赖项的方式与路由处理函数一样,因此,定义依赖项函数,也要应用与路由处理函数相同的规则。

既可以使用异步的 async def, 也可以使用普通的 def 定义依赖项。

在普通的 def 路由处理函数中,可以声明异步的 async def 依赖项;也可以在异步的 async def 路由处理函数中声明普通的 def 依赖项。

其他与依赖注入相关联或等效的术语:

  • 资源(Resource)
  • 提供方(Provider)
  • 服务(Service)
  • 可注入(Injectable)
  • 组件(Component)

与 OpenAPI 集成

依赖项及子依赖项的所有请求声明、验证和需求都可以集成在同一个 OpenAPI 概图。

交互文档中显示依赖项的所有信息:

Pasted image 20240626160645.png

FastAPI 层级式依赖

层级式依赖注入系统可以定义依赖其他依赖项的依赖项。依赖项层级数构建后,依赖注入系统会处理所有依赖项及其子依赖项,并为每一步操作提供注入结果。

比如,下面有 4 个 API 路由处理(端点):

  • /items/public/
  • /items/private/
  • /users/{user_id}/activate
  • /items/pro/

开发者可以使用依赖项及其子依赖项为这些路径操作添加不同的权限:

Pasted image 20240625083748.png

FastAPI 兼容性

依赖注入系统如此简洁的特性,让 FastAPI 可以与下列系统兼容:

  • 关系型数据库
  • NoSQL 数据库
  • 外部支持库
  • 外部 API
  • 认证和鉴权系统
  • API 使用监控系统
  • 响应数据注入系统
  • 等等

后续会继续学习在关系型数据库、NoSQL 数据库、安全等方面的依赖项使用场景。

中间件

中间件在 Web 框架(例如 FastAPI、Django、Flask 等)中是一种拦截机制,可以处理请求和响应的额外逻辑。

中间件的工作流程:

  1. 接收请求
    • 中间件拦截客户端的每个请求。
    • 它可以执行身份验证、日志记录、请求重写等操作。
  2. 传递给路由处理
    • 经过所有中间件处理后,请求最终传递到匹配的路由处理函数。
  3. 接收响应
    • 路由处理函数生成响应后,响应会经过所有中间件的处理(一般是反向顺序)。
    • 中间件可以对响应进行修改,例如添加 CORS 头、修改内容等。
  4. 返回响应
    • 最终,中间件返回客户端可用的响应。

说明:

  • 中间件以 链式调用 的形式运行,一个中间件完成后会调用下一个,直到请求到达核心路由处理函数。
  • 中间件通常可以分为:
    • 前置中间件:在请求到达路由之前处理。
    • 后置中间件:在响应返回客户端之前处理。

创建中间件

要创建中间件,可以在函数上使用装饰器 @app.middleware("http")

定义了一个 FastAPI 的 HTTP 中间件,用于计算每个请求的处理时间,并将结果作为自定义响应头 X-Process_Time 添加到 HTTP 响应中。

代码如下:

#test/main.py

from typing import Union
from fastapi import FastAPI, Depends


# 依赖注入
async def common_parameters(
        q: Union[str, None] = None, skip: int = 0, limit: int = 100
):
    return {"q": q, "skip": skip, "limit": limit}


@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons


@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
    return commons


from fastapi import Request
import time


# 定义HTTP中间件
# 计算每个HTTP请求的处理时间
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process_Time"] = str(process_time)
    return response

说明:

  • call_next 是由 FastAPI 框架自动注入到中间件函数中的一个参数,实际上它是一个调用链中的回调函数,用于将请求传递给下一个中间件或者最终的路由处理函数。
  • 自定义请求头和 X- 前缀:
    • 自定义的 HTTP 请求头通常使用 'X-' 前缀以示与标准头区分。例如:X-Process-Time
    • 现代建议倾向于避免使用 X- 前缀,但它仍被广泛采用,尤其是为了兼容旧系统。
  • 将处理时间以字符串形式添加到响应头 X-Process_Time 中,供客户端查看。
  • 浏览器支持与 expose_headers
    • 如果您希望让浏览器客户端访问自定义响应头,必须在 CORS 配置中显式暴露这些头部。
    • StarletteCORS 中间件允许通过参数 expose_headers 配置需要暴露的响应头。这样,浏览器中的 JavaScript 脚本可以通过 fetchXMLHttpRequest 获取 X-Process-Time 的值。

如上述代码,调用中间件的实际流程:

  1. 请求到达应用:如客户端发送 get("/users/") 请求。
  2. 进入中间件:请求首先进入定义的中间件 add_process_time_header
  3. 记录开始时间:中间件记录当前开始时间 start_time
  4. 调用 call_next(request):中间件调用 call_next(request),将请求传递给匹配的路由处理器(即 read_users)。
  5. 处理请求:路由处理器 read_user 处理请求并返回一个响应对象 commons
  6. 返回中间件:响应对象返回到中间件。
  7. 计算处理时间:中间件计算处理时间 process_time
  8. 添加响应头:中间件将处理时间以 X-Process-Time 作为键添加到响应头中。
  9. 返回响应:中间件返回修改后的响应对象,包含处理时间信息。
  10. 客户端接收响应:客户端接收到包含 X-Process-Time 的响应头,可以查看请求的处理时间。

客户端发送 get("/users/") 请求,且接收响应结果如下:

Pasted image 20240627102639.png

其他中间件

稍后在 Advanced User Guide: Advanced Middleware阅读学习更多关于中间件的内容。

CORS(跨域资源共享)

参考学习 CORS(跨域资源共享)

接入关系型数据库

FastAPI 适用于与任何风格的数据库进行通信。

在这里,使用接入 MySQL 数据库的示例。

ORM(对象关系映射)

ORM 具有在代码中的对象和数据库表(关系)之间转换(映射)的工具。

一般来说,使用 ORM,可以创建与数据库表相对应的映射类,该类的每个属性代表一个列,具有名称和类型。该类的每个实例对象都代表数据库表中的一行数据。

文件结构

使用之前的项目 fastAPI,创建一个子目录名为 sqlapp,其结构如下:

.
└── sqlapp
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py

__init__.py 只是一个空文件,但它告诉 Python 子目录 sqlapp 及其所有模块文件是一个包。

使用 FastAPI 和 SQLAlchemy 构建 API 时,通常需要使用两种类型的模型:

  • SQLAlchemy 模型用于与数据库交互,定义数据库的表结构
  • Pydantic 模型用于数据验证和序列化。

在 FastAPI 中接入 MySQL,可以使用 SQLAlchemymysqlclient 等库。

安装依赖库

pip install sqlalchemy mysqlclient

创建 SQLAlchemy 部件

参考[[深入 Django 框架模型]]中创建的数据库配置信息。定义 MySQL 数据库连接的配置。

# sqlapp/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# SQLALCHEMYDATABASEURL = "sqlite:///./sqlapp.db"
# SQLALCHEMYDATABASEURL = "postgresql://user:password@postgresserver/db"

# 为 SQLALchemy 定义数据库
# SQLALCHEMYDATABASEURL = "mysql://username:password@host:port/databasename"
SQLALCHEMY_DATABASE_URL = "mysql://hellokitty:123456@192.168.230.131:3306/vote"

# 创建 SQLAlchemy 引擎
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,  # connect_args={"check_same_thread": False}
)

# 创建数据库会话实例
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建数据库模型基类
Base = declarative_base()

说明:

  • connect_args={"check_same_thread": False} 是一个特定于 SQLite 数据库的配置,用于允许多个线程共享同一个数据库连接。SQLite 默认会检查线程安全性(check_same_thread=True),即禁止其他线程访问已在某个线程中创建的连接。

创建数据库模型

定义两个 SQLALchemy ORM 模型:UserItem,通过 ORM 自动映射模型到数据库表。

# sqlapp/database.py

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from .database import Base

class User(Base):
    # 模型使用的数据库表名
    __tablename__ = "users"

    # 定义列
    id = Column(Integer, primary_key=True)  # 主键
    email = Column(String(50), unique=True, index=True)  # 唯一并且有索引的电子邮件列
    hashed_password = Column(String(100))  # 哈希密码列
    is_active = Column(Boolean, default=True)  # 活跃状态列,默认为True

    # 定义与Item模型的关系,表示每个User可以有多个Item
    items = relationship("Item", back_populates="owner")


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    title = Column(String(100), index=True)
    description = Column(String(200), index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))  # 外键,引用 users 表的 id 列

    # 定义与User模型的关系,表示每个Item属于一个User
    owner = relationship("User", back_populates="items")

说明:

  • 通过 SQLAlchemy 中的 relationshipForeignKey 来定义模型间的关系。
  • 一对多关系:一个 User 可以拥有多个 Item(即一对多)。通过 User 模型中的 items 属性访问与该用户相关的多个 Item,通过 Item 模型中的 owner 属性访问项目的所属用户。
  • 外键Item 表中的 owner_id 列作为外键,引用了 User 表中的 id 列,确保每个项目对应一个用户。

创建 Pydantic 模型

Pydantic 模型定义了数据的输入和输出格式,规范和验证数据的输入和输出,确保数据的结构和类型在应用中是严格控制的。

# sqlapp/schemas.py
from typing import Union
from pydantic import BaseModel


# 定义Item基础属性模型
class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None


# 创建项目的输入数据模型
class ItemCreate(ItemBase):
    pass


# 定义从数据库中检索到的Item数据模型,扩展id和ownerid字段
class Item(ItemBase):
    id: int
    owner_id: int

    class Config:
        from_attributes = True  # 使Pydantic模型能够自动从ORM对象中提取属性


# 创建用户的输入数据模型
class UserBase(BaseModel):
    email: str


# 定义从数据库中检索到的用户数据模型
class UserCreate(UserBase):
    password: str


# 定义从数据库中检索到的用户数据模型,扩展id、isactive和items字段
class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []

    class Config:
        from_attributes = True  # 使Pydantic模型能够自动从ORM对象中提取属性

说明:

  • 在 Pydantic V2 中,配置键 orm_mode 被重命名为 from_attributes
  • from_attributes = True:指示 Pydantic 在处理数据库对象时使用 ORM 模式,允许直接从 ORM 模型中读取数据。
  • 定义 class UserCreate 类时,不包含 idis_active 这些属性。这是因为模型定义时指定了 id 为主键,默认数据库会自动生成一个唯一的标识符(自增主键);is_active 定义时指定了默认布尔值 True

注意:通常,SQLAlchemy 和许多其他情况默认是“延迟加载”。例如,它们不会从数据库中获取数据,除非你尝试访问包含该数据的属性。

CRUD 操作

利用 SQLAlchemy 的 ORM 功能,编写可重用的函数用来与数据库中的数据进行交互。

CRUD 操作分别为:增加(Create)、查询(Read)、更改(Update)、删除(Delete),即增删改查。

# sqlapp/crud.py

from sqlalchemy.orm import Session
from . import models, schemas


def get_user(db: Session, user_id: int):
    """
    通过用户ID从数据库中获取用户
    :param db: 数据库会话
    :param user_id: 用户的唯一标识符
    :return: 返回匹配的User用户对象,或者None如果未找到
    """
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    """
    通过电子邮件从数据库中获取用户
    :param db:
    :param email:
    :return:
    """
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    """
    从数据库中获取多个用户,支持分页
    :param db:
    :param skip:跳过的记录数
    :param limit: 返回的记录数限制
    :return: 返回用户对象列表
    """
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    """
    在数据库中创建一个新用户,使用用户提供的电子邮件和密码
    :param db:
    :param user:包含用户信息的Pydantic模型类型
    :return:
    """
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)  # 创建新用户对象
    db.add(db_user)  # 将新用户添加到数据库会话中
    db.commit()  # 提交事务,将所有挂起的更改保存到数据库中
    db.refresh(db_user)  # 刷新实例对象,使其属性与数据库中的状态保存一致
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    """
    从数据库中获取多个项目,支持分页
    :param db:
    :param skip:
    :param limit:
    :return:
    """
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    """
    为指定用户创建一个新项目
    :param db:
    :param item:包含项目创建信息的Pydantic模型
    :param user_id:项目所属用户的唯一标识符
    :return:
    """
    # 创建Item对象,使用传入的项目数据(通过item.dict()转换为字典)和user_id
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

说明:

  • models.Item(**item.dict(), owner_id=user_id):创建 Item 对象,从 Pydantic 模型实例 itemuser_id 中获取其属性。
    • **item.dict():利用了 Python 的解包操作符 ** 将解包字典为关键字参数并传递给 SQLAlchemy 模型的构造函数。

注意:此示例中的密码未经过哈希处理不安全,生产环境不能以明文形式保存它们。更多详细信息,见官方文档的安全内容

FastAPI 应用程序

使用 FastAPI 框架创建 Web 应用程序,定义一些 API 端点。

# sqlapp/main.py

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

# 根据定义的SQLAlchemy模型创建所有数据库表
models.Base.metadata.create_all(bind=engine)

# 创建FastAPI应用实例
app = FastAPI()


# 创建数据库会话依赖项
def get_db():
    """
    创建一个数据库会话,并将其传递给依赖项调用方
    :return:
    """
    db = SessionLocal()  # 创建数据库会话
    try:
        yield db  # 将数据库会话对象传递给调用方
    finally:
        db.close()  # 当调用方执行完成后,关闭数据库会话


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    """
    创建新用户
    :param user: 请求体中的用户数据
    :param db: 数据库会话,使用依赖注入
    :return:
    """
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    """
    获取多个用户的API端点
    :param skip:
    :param limit:
    :param db:
    :return:
    """
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    """
    获取单个用户
    :param user_id:
    :param db:
    :return:
    """
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user


@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
        user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    """
    为指定用户创建一个新项目
    :param user_id: 用户的唯一标识符,作为路径参数
    :param item: 请求体中的项目数据
    :param db: 数据库会话
    :return: 返回Item类型实例
    """
    return crud.create_user_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    """
    获取多个项目
    :param skip:
    :param limit:
    :param db:
    :return:
    """
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

说明:

  • yield db 用于返回数据库会话对象,是 Python 生成器的一个应用。生成器可以在请求期间提供资源(如数据库会话),并在请求处理结束后自动清理资源(关闭会话)。yield 在 FastAPI 中与依赖注入系统结合使用,管理数据库连接等资源。
  • 虽然 SQLAlchemy 模型涉及数据库通信,通常使用异步的 async def 进行路由处理,但由于数据库操作本身通常是同步的,FastAPI 可以使用同步的 def 来声明路由和依赖项,避免不必要的复杂性。这是因为数据库操作不会像 I/O 密集型操作那样直接从异步中获益,反而同步调用会更加简单和直观。

运行项目

使用 Uvicorn 命令运行服务:

(venv) PS D:\PycharmProjects\fastAPI> uvicorn sqlapp.main:app --reload

打开浏览器访问 http://127.0.0.1:8000/docs。

创建的 FastAPI 应用程序端点:

Pasted image 20240629062101.png

与 FastAPI 应用程序交互,添加用户数据到数据库表

Pasted image 20240628234904.png

检查 FastAPI 应用视图内添加的用户数据

Pasted image 20240629060341.png

检查 MySQL 数据库,已创建表有 usersitems,且 users 数据表有添加一行数据。

Pasted image 20240629061238.png

中间件替代数据库会话

如果不使用带有 yield 的依赖项,可以使用类似的方法在“中间件”中设置会话。

添加中间件为每个请求创建一个新的 SQLAlchemy 的 SessionLocal 会话,将其添加到请求中,然后在请求完成后关闭它。

如下将注释代码替换中间件的内容:

''''''
# 创建数据库会话依赖项
# def getdb():
#     """
#     创建一个数据库会话,并将其传递给依赖项调用方
#     :return:
#     """
#     db = SessionLocal()  # 创建数据库会话
#     try:
#         yield db  # 将数据库会话对象传递给调用方
#     finally:
#         db.close()  # 当调用方执行完成后,关闭数据库会话

from fastapi import Request, Response


@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()  # 为当前请求创建一个数据库会话
        response = await call_next(request)  # 继续处理请求
    finally:
        request.state.db.close()  # 确保在请求完成后关闭数据库会话
    return response


def get_db(request: Request):
    return request.state.db  # 获取当前请求中的数据库会话对象

''''''

说明:

  • request.stateRequest 对象的属性。它用于存储附加到请求本身的任意对象,如本例中数据库会话。
  • 尽管可以用中间件实现,但最好使用带有 yield 的依赖项。有以下原因:
    • 因为中间件实现需要更多的代码且更复杂一些。
    • 中间件必须有一个 async 函数,因为要等待网络,它可能会阻止你的应用程序并降低性能。
    • 每个请求都会运行一个中间件。这将为每个请求创建一个连接,即使处理该请求的路径操作不需要数据库。

数据库迁移

Alembic 是 SQLAlchemy 项目的标准数据库迁移工具,主要用于追踪和应用数据库模式(schema)的变化。

虽然 Alembic 不是必需的,但在项目中使用它可以极大地简化数据库模式的管理。尤其在团队开发和持续集成/部署。

当涉及到更新数据模型结构进行数据库迁移时,按照如下一般操作步骤。

安装 Almebic

pip install alembic

使用 ALembic

初始化 Alembic

在项目的根目录下,初始化 Alembic:

alembic init alembic

初始化示例:

> alembic init alembic
Creating directory 'D:\\PycharmProjects\\fastAPI\\alembic' ...  done
Creating directory 'D:\\PycharmProjects\\fastAPI\\alembic\\versions' ...  done
Generating D:\PycharmProjects\fastAPI\alembic.ini ...  done
Generating D:\PycharmProjects\fastAPI\alembic\env.py ...  done
Generating D:\PycharmProjects\fastAPI\alembic\README ...  done
Generating D:\PycharmProjects\fastAPI\alembic\script.py.mako ...  done
Please edit configuration/connection/logging settings in 'D:\\PycharmProjects\\fastAPI\\alembic.ini' before proceeding.

配置 Alembic

在项目根目录,编辑 alembic.ini 文件,配置数据库连接字符串。如下内容:

# alembic.ini
; sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = mysql://hellokitty:123456@192.168.230.131:3306/vote

alembic/env.py 文件中,配置模型元数据。修改如下内容:

# alembic/env.py
# from myapp import mymodel
# targetmetadata = mymodel.Base.metadata

from sqlapp import models
target_metadata = models.Base.metadata

说明:

  • models.Base.metadata 提供的是与 ORM 模型相关的元数据集合,通常用于描述数据库模式。这个对象是 SQLAlchemy 声明式系统的重要组成部分。
    • models 模块通常包含项目中定义的所有 ORM 模型。
    • Base 是 ORM 模型的基类,通常通过 declarative_base() 创建。
    • 通过 Base.metadata,可以访问与所有定义模型相关的元数据信息。

创建初始迁移

首先,确保数据库已经初始化并且包含定义的模型。然后,使用 ALembic 自动生成迁移文件。

alembic revision --autogenerate -m "Initial migration"

说明:

  • 这将在 alembic/versions 目录下创建一个新的迁移文件,包含当前模型的数据库模式。

应用迁移

用于将数据库的模式升级到指定版本:

alembic upgrade head
  • 指定 head 表示将数据库迁移到最新的版本(也就是当前迁移脚本中定义的最顶层版本)。

管理更新迁移

当对模型进行更改时,例如添加新字段或新表,使用如下命令生成新的迁移:

alembic revision --autogenerate -m "Add new field to User model"

然后应用迁移:

alembic upgrade head

部署

虚拟化部署(Docker)

参阅文档:容器中的 FastAPI - Docker

参考

FastAPI 框架官方文档

本文由作者按照 CC BY 4.0 进行授权。