Complete FastAPI masterclass from scratch

01 - Introduction

02 - Getting started

001 Section overview

002 Installation on Windows

pip install fastapi[all]
pip install fastapi uvicorn

# 1_hello-fastapi/main.py
from fastapi import FastAPI

app = FastAPI()

@app.get('/')
def index():
    return 'hello fastapi'
uvicorn main:app --reload

004 FastAPI features

005 HelloWorld discussion

03 - GET method

001 Section overview

002 Path parameters

# 1_hello-fastapi/main.py
from fastapi import FastAPI

app = FastAPI()


@app.get('/')
def index():
    return {'msg': 'hello fastapi'}


# 正常运行
@app.get('/blog/all')
def get_all_blogs():
    return {'msg': 'all blogs provided'}


@app.get('/blog/{id}')
# def get_blog(id):
def get_blog(id: int):  # 如果url里的参数类型不对(比如3.3), 会返回错误信息
    return {'msg': f'Blog {id}'}

# 放在这会被上面的/{id}拦截, 然后报错, 注意方法的声明顺序
# @app.get('/blog/all')
# def get_all_blogs():
#     return {'msg': 'all blogs provided'}

003 Predefined values

# 2_get-method/main.py
from fastapi import FastAPI
from enum import Enum

app = FastAPI() 

###


# 博客类型
class BlogType(str, Enum):
    short = 'short'
    story = 'story'
    howto = 'howto'


@app.get('/blog/type/{type}')
def get_blog_type(type: BlogType):
    return {'msg': f'Blog type {type}'}


@app.get('/blog/{id}')
###

004 Query parameters


swagger页面空白问题

# 2_get-method/main.py
from fastapi import FastAPI
from typing import Optional
from enum import Enum

from fastapi.staticfiles import StaticFiles

app = FastAPI() 

### 

@app.get('/blog/all')
# def get_all_blogs(page=1, pageSize=10):
def get_all_blogs(page=1, pageSize: Optional[int] = None):
    return {'msg': f'All {pageSize} blogs on page {page}'}


@app.get('/blog/{id}/comments/{comment_id}') # ?valid=true&username=Tom 
def get_comment(id: int, comment_id: int, valid: bool = True, username: Optional[str] = None):
    return {'msg': f'blog_id {id} comment_id {comment_id} valid {valid} username {username}'}

04 - Operation description

001 Section overview

002 Status code

from fastapi import FastAPI, Response, status

# 本接口只会返回 status_code 404
# @app.get('/blog/{id}', status_code=404)
@app.get('/blog/{id}', status_code=status.HTTP_404_NOT_FOUND)
# def get_blog(id):
def get_blog(id: int):  # 如果url里的参数类型不对(比如3.3), 会返回错误信息
    if id > 5:
        return {'error': f'Blog {id} not found'}
    else:
        return {'msg': f'Blog {id}'}


@app.get('/v2/blog/{id}')
# @app.get('/v2/blog/{id}', status_code=status.HTTP_200_OK)
# def get_blog(id):
def get_blog_copy(id: int, resp: Response):  # 如果url里的参数类型不对(比如3.3), 会返回错误信息
    if id > 5:
        resp.status_code = status.HTTP_404_NOT_FOUND
        return {'error': f'Blog {id} not found'}
    else:
        resp.status_code = status.HTTP_200_OK
        return {'msg': f'Blog {id}'}

003 Tags

004 Summary and description

@app.get('/blog/all', tags=['blog'],
         description='This api call simulates fetching all blogs',
         response_description="The list of available blogs")
# def get_all_blogs(page=1, pageSize=10):
def get_all_blogs(page=1, pageSize: Optional[int] = None):
    return {'msg': f'All {pageSize} blogs on page {page}'}


@app.get('/blog/{id}/comments/{comment_id}', tags=['blog', 'comment'])  # ?valid=true&username=Tom
def get_comment(id: int, comment_id: int, valid: bool = True, username: Optional[str] = None):
    """
      Simulates retrieving a comment of a blog
      - **id** mandatory path parameter
      - **comment_id** mandatory path parameter
      - **bool** optional query parameter
      - **username** optional query parameter
    """ 
    # 注释会被识别然后添加到swagger doc
    return {'msg': f'blog_id {id} comment_id {comment_id} valid {valid} username {username}'}

005 Response description

05 - Routers

001 Section overview

002 Routers

003 Refactoring the app

# 4_Routers/router/blog_get.py
from fastapi import APIRouter, Response, status
from typing import Optional
from enum import Enum

router = APIRouter(
    prefix="/blog",
    tags=["blog"],
)


# 正常运行
# @router.get('/all')
# def get_all_blogs():
#     return {'msg': 'all blogs provided'}

@router.get('/all',
            description='This api call simulates fetching all blogs',
            response_description="The list of available blogs")
def get_all_blogs(page=1, pageSize: Optional[int] = None):
    return {'msg': f'All {pageSize} blogs on page {page}'}


@router.get('/{id}/comments/{comment_id}', tags=['comment'])
def get_comment(id: int, comment_id: int, valid: bool = True, username: Optional[str] = None):
    """
      Simulates retrieving a comment of a blog
      - **id** mandatory path parameter
      - **comment_id** mandatory path parameter
      - **bool** optional query parameter
      - **username** optional query parameter
    """
    return {'msg': f'blog_id {id} comment_id {comment_id} valid {valid} username {username}'}


# 博客类型
class BlogType(str, Enum):
    short = 'short'
    story = 'story'
    howto = 'howto'


@router.get('/type/{type}')
def get_blog_type(type: BlogType):
    return {'msg': f'Blog type {type}'}


@router.get('/{id}')
def get_blog_copy(id: int, resp: Response):
    if id > 5:
        resp.status_code = status.HTTP_404_NOT_FOUND
        return {'error': f'Blog {id} not found'}
    else:
        resp.status_code = status.HTTP_200_OK
        return {'msg': f'Blog {id}'}
# 4_Routers/main.py
from fastapi import FastAPI
from router import blog_get

app = FastAPI()
# 导入blog_get.py里的路由
app.include_router(blog_get.router)


@app.get('/')
def index():
    return {'msg': 'hello fastapi'}

004 Adding a second router

from fastapi import APIRouter

router = APIRouter(
    prefix='/blog',
    tags=['blog'],
)


@router.post('/new')
def create_blog():
    return {'data': 'blog created'}
# 4_Routers/main.py
from fastapi import FastAPI
from router import blog_get, blog_post

app = FastAPI()
# 导入路由
app.include_router(blog_get.router)
app.include_router(blog_post.router)

06 - Parameters

001 Section overview

002 Request body

# 5_Parameters/router/blog_post.py
from typing import Optional

from fastapi import APIRouter
from pydantic import BaseModel

router = APIRouter(
    prefix='/blog',
    tags=['blog'],
)


class BlogModel(BaseModel):
    title: str
    content: str
    nb_comments: int
    # 加了 = None 才能不传
    published: Optional[bool] = None


@router.post('/new')
def create_blog(blog: BlogModel):
    return {'data': blog}

003 Path and query parameters

@router.post('/new/{id}')
def create_blog(blog: BlogModel, id: int, version: int = 1):
    return {
        'id': id,
        'data': blog,
        'version': version
    }

004 Parameter metadata

from fastapi import Query 
###
@router.post('/new/{id}/comment')
def create_blog_comment(blog: BlogModel, id: int,
                        comment_id: int = Query(
                            None,
                            title="Id of the comment",
                            description='Some description for comment_id',
                            alias='commentId',
                            deprecated=True,  # 已弃用 
                        )):
    return {
        'blog': blog,
        'id': id,
        'comment_id': comment_id
    }

005 Validators

@router.post('/new/{id}/comment')
def create_blog_comment(
        blog: BlogModel, id: int,
        comment_id: int = Query(
            None,
            title="Id of the comment",
            description='Some description for comment_id',
            alias='commentId',
            deprecated=True,  # 已弃用
        ),
        # content: str = Body('hi how are you')
        # content: str = Body(...)
        content: str = Body(
            Ellipsis,
            min_length=10,
            max_length=20,
            # regex='^hi', # 正则 - hi开头
            regex='^[a-z\s]*$', # 正则
        )
):
    return {
        'blog': blog,
        'id': id,
        'comment_id': comment_id,
        'content': content
    }

006 Multiple values

它的是swagger-ui 5.9.0, 我这cdn是5.6.2, 不一样

@router.post('/new/{id}/comment')
def create_blog_comment(
        blog: BlogModel, id: int,
        comment_id: int = Query(
            None,
            title="Id of the comment",
            description='Some description for comment_id',
            alias='commentId',
            deprecated=True,  # 已弃用
        ),
        # content: str = Body('hi how are you')
        # content: str = Body(...)
        content: str = Body(
            Ellipsis,
            min_length=10,
            max_length=20,
            # regex='^hi', # 正则 - hi开头
            regex='^[a-z\s]*$',  # 正则
        ),
        # v: Optional[List[str]] = Query(None)
        v: Optional[List[str]] = Query(['1', '2', '3'])
):
    return {
        'blog': blog,
        'id': id,
        'comment_id': comment_id,
        'content': content,
        'version': v
    }

007 Number validators

@router.post('/new/{id}/comment/{comment_id}')
def create_blog_comment(
        blog: BlogModel, id: int,
        comment_title: str = Query(
            None,
            title="Title of the comment",
            description='Some description for comment_title',
            alias='commentTitle',
        ),
        # content: str = Body('hi how are you')
        # content: str = Body(...)
        content: str = Body(
            Ellipsis,
            min_length=10,
            max_length=20,
            # regex='^hi', # 正则 - hi开头
            regex='^[a-z\s]*$',  # 正则
        ),
        # v: Optional[List[str]] = Query(None)
        v: Optional[List[str]] = Query(['1', '2', '3']),
        # Path parameters cannot have a default value
        # comment_id: int = Path(None, gt=5, le=10)
        comment_id: int = Path(..., gt=5, le=10)
):
    return {
        'blog': blog,
        'id': id,
        'comment_id': comment_id,
        'comment_title': comment_title,
        'content': content,
        'version': v
    }

008 Complex subtypes

class Image(BaseModel):
    url: str
    alias: str


class BlogModel(BaseModel):
    title: str
    content: str
    nb_comments: int
    # 加了 = None 才能不传
    published: Optional[bool] = None
    tags: List[str] = []
    metadata: Dict[str, str] = {'k1': 'v1'}
    image: Optional[Image] = None

07 - Database with SQLAlchemy

001 Section overview

002 Quick intro about Dependencies

003 Databases in FastAPI

005 Create database and table

pip install sqlalchemy fastapi uvicorn
# 6_SQLAlchemy/db/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./fastapi-practice.db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

006 Solving a common python environment problem

007 Create database and table continued

# 6_SQLAlchemy/db/database.py
# 
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()  
# 6_SQLAlchemy/db/models.py
from db.database import Base
from sqlalchemy import Boolean, Column, Integer, String


class DbUser(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String)
    email = Column(String)
    password = Column(String)
# 6_SQLAlchemy/main.py
from fastapi import FastAPI
from router import blog_get, blog_post
from db import models
from db.database import engine

app = FastAPI()
# 导入路由 ...


@app.get('/')
def index():
    return {'msg': 'hello fastapi'}


# 创建数据库表
models.Base.metadata.create_all(engine)

008 Write data in database

pip install bcrypt passlib
# 6_SQLAlchemy/schemas.py -> 在程序中使用的orm表对象
from pydantic import BaseModel


class UserBase(BaseModel):
    username: str
    email: str
    password: str


# 展示给前端, 不展示密码
class UserDisplay(BaseModel):
    username: str
    email: str
    # 让系统自动返回这个类 DbUser -> UserDisplay
    class Config():
        # F:\anaconda\envs\py39f\lib\site-packages\pydantic\_internal\_config.py:321:
        # UserWarning: Valid config keys have changed in V2:
        # * 'orm_mode' has been renamed to 'from_attributes'
        # orm_mode = True
        from_attributes = True
# 6_SQLAlchemy/db/db_user.py -> 操作表的函数
from sqlalchemy.orm.session import Session
from db.hash import Hash
from schemas import UserBase
from db.models import DbUser


def create_user(db: Session, request: UserBase):
    new_user = DbUser(
        username=request.username,
        email=request.email,
        password=Hash.bcrypt(request.password)
    )
    db.add(new_user)
    db.commit()
    db.refresh(new_user)  # 刷新id
    return new_user
# router/user.py
from fastapi import APIRouter, Depends
from schemas import UserBase, UserDisplay
from db.database import get_db
from db import db_user
from sqlalchemy.orm import Session

router = APIRouter(
    prefix="/user",
    tags=["users"]
)


# Create user
@router.post('/', response_model=UserDisplay)
def create_user(request: UserBase, db: Session = Depends(get_db)):
    return db_user.create_user(db, request)

# Read user

# Update user

# Delete user

009 Process review

010 Create and read

# 6_SQLAlchemy/db/db_user.py -> 操作表的函数
from sqlalchemy.orm.session import Session
from db.hash import Hash
from schemas import UserBase
from db.models import DbUser


def create_user(db: Session, request: UserBase):
    ### 


def get_all_users(db: Session):
    return db.query(DbUser).all()


def get_user(db: Session, user_id: int):
    return db.query(DbUser).filter(DbUser.id == user_id).first()
# router/user.py
from fastapi import APIRouter, Depends
from schemas import UserBase, UserDisplay
from db.database import get_db
from db import db_user
from typing import List
from sqlalchemy.orm import Session

router = APIRouter(
    prefix="/user",
    tags=["users"]
)


# Create user
###


# Read All user
@router.get('/', response_model=list[UserDisplay])
def get_all_users(db: Session = Depends(get_db)):
    return db_user.get_all_users(db)


# Read One user
@router.get('/{id}', response_model=UserDisplay)
def get_user(id: int, db: Session = Depends(get_db)):
    return db_user.get_user(db, id)

# Update user

# Delete user

011 Update and delete

# 6_SQLAlchemy/db/db_user.py -> 操作表的函数
from sqlalchemy.orm.session import Session
from db.hash import Hash
from schemas import UserBase
from db.models import DbUser


###


def update_user(db: Session, id: int, request: UserBase):
    user = db.query(DbUser).filter(DbUser.id == id)
    user.update({
        DbUser.username: request.username,
        DbUser.email: request.email,
        DbUser.password: Hash.bcrypt(request.password)
    })
    db.commit()
    return 'ok'

def delete_user(db: Session, id: int):
    user = db.query(DbUser).filter(DbUser.id == id).first()
    db.delete(user)
    db.commit()
    return 'ok'
# router/user.py
from fastapi import APIRouter, Depends
from schemas import UserBase, UserDisplay
from db.database import get_db
from db import db_user
from typing import List
from sqlalchemy.orm import Session

router = APIRouter(
    prefix="/user",
    tags=["users"]
)


###


# Update user
@router.post('/{id}/update')
def update_user(id: int, request: UserBase, db: Session = Depends(get_db)):
    return db_user.update_user(db, id, request)


# Delete user
@router.delete('/delete/{id}')
def delete_user(id: int, db: Session = Depends(get_db)):
    return db_user.delete_user(db, id)

012 Relationships

# 6_SQLAlchemy/db/models.py
from sqlalchemy.orm import relationship

from db.database import Base
from sqlalchemy import Column, ForeignKey
from sqlalchemy.sql.sqltypes import Boolean, Integer, String


class DbUser(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String)
    email = Column(String)
    password = Column(String)
    # back_populates 对应的是另一个表的字段
    items = relationship("DbArticle", back_populates="user")


class DbArticle(Base):
    __tablename__ = "articles"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String)
    content = Column(String)
    published = Column(Boolean)
    user_id = Column(Integer, ForeignKey('users.id'))
    # back_populates 对应的是另一个表的字段
    user = relationship("DbUser", back_populates="items")
# 6_SQLAlchemy/schemas.py -> 在程序中使用的orm表对象
from typing import List

from pydantic import BaseModel


# Article inside UserDisplay
class Article(BaseModel):
    title: str
    content: str
    published: bool

    class Config():
        from_attributes = True


class UserBase(BaseModel):
    username: str
    email: str
    password: str


# 展示给前端, 不展示密码
class UserDisplay(BaseModel):
    username: str
    email: str
    items: List[Article]

    # 让系统自动返回这个类 DbUser -> UserDisplay
    class Config():
        # F:\anaconda\envs\py39f\lib\site-packages\pydantic\_internal\_config.py:321:
        # UserWarning: Valid config keys have changed in V2:
        # * 'orm_mode' has been renamed to 'from_attributes'
        # orm_mode = True
        from_attributes = True


# User inside ArticleDisplay
class User(BaseModel):
    id: int
    username: str

    class Config():
        from_attributes = True


class ArticleBase(BaseModel):
    title: str
    content: str
    published: bool
    creator_id: int


class ArticleDisplay(BaseModel):
    title: str
    content: str
    published: bool
    user: User

    class Config():
        from_attributes = True
# db/db_article.py
from sqlalchemy.orm.session import Session

from db.models import DbArticle

from schemas import Article, ArticleBase


def create_article(db: Session, request: ArticleBase):
    new_article = DbArticle(
        title=request.title,
        content=request.content,
        published=request.published,
        user_id=request.creator_id
    )
    db.add(new_article)
    db.commit()
    db.refresh(new_article)
    return new_article


def get_article(db: Session, id: int):
    article = db.query(DbArticle).filter(DbArticle.id == id).first()
    return article
# router/article.py
from fastapi import APIRouter, Depends
from schemas import ArticleBase, ArticleDisplay
from db.database import get_db
from db import db_article
from typing import List
from sqlalchemy.orm import Session

router = APIRouter(
    prefix="/article",
    tags=["article"]
)


# create article
@router.post("/", response_model=ArticleDisplay)
def create_article(request: ArticleBase, db: Session = Depends(get_db)):
    return db_article.create_article(db, request)


# get article
@router.get("/{id}", response_model=ArticleDisplay)
def get_article(id: int, db: Session = Depends(get_db)):
    return db_article.get_article(db, id)
# 6_SQLAlchemy/main.py
from fastapi import FastAPI
from router import blog_get, blog_post, user, article
from db import models
from db.database import engine

app = FastAPI()
# 导入路由
app.include_router(blog_get.router)
app.include_router(blog_post.router)
app.include_router(user.router)
app.include_router(article.router)

08 - Concepts

001 Section overview

002 Error handling

# 7_concept/db/db_article.py

def get_article(db: Session, id: int):
    article = db.query(DbArticle).filter(DbArticle.id == id).first()
    if not article:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Article with id {id} not found"
        )
    return article
# 7_concept/db/db_user.py -> 操作表的函数

def get_user(db: Session, user_id: int):
    user = db.query(DbUser).filter(DbUser.id == user_id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {id} not found"
        )
    return user


def update_user(db: Session, id: int, request: UserBase):
    user = db.query(DbUser).filter(DbUser.id == id)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {id} not found"
        )
    user.update({
        DbUser.username: request.username,
        DbUser.email: request.email,
        DbUser.password: Hash.bcrypt(request.password)
    })
    db.commit()
    return 'ok'


def delete_user(db: Session, id: int):
    user = db.query(DbUser).filter(DbUser.id == id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {id} not found"
        )
    db.delete(user)
    db.commit()
    return 'ok'
# exceptions.py

class StoryException(Exception):
    def __init__(self, name: str):
        self.name = name
# 7_concept/db/db_article.py
from sqlalchemy.orm.session import Session
from db.models import DbArticle
from exceptions import StoryException
from schemas import Article, ArticleBase
from fastapi import HTTPException, status


def create_article(db: Session, request: ArticleBase):
    # 用于模拟异常
    if request.content.startswith('Once upon a time'):
        raise StoryException('No stories please')
    new_article = DbArticle(
        title=request.title,
        content=request.content,
        published=request.published,
        user_id=request.creator_id
    )
    db.add(new_article)
    db.commit()
    db.refresh(new_article)
    return new_article
# 7_concept/main.py
from fastapi import FastAPI, Request
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from exceptions import StoryException
from router import blog_get, blog_post, user, article
from db import models
from db.database import engine

@app.exception_handler(StoryException)
def story_exception_handler(request: Request, exc: StoryException):
    return JSONResponse(
        status_code=418,  # 418 测试状态码
        content={"detail": exc.name}
    )


# 这个会拦截所有异常, 不建议
# @app.exception_handler(HTTPException)
# def custom_exception_handler(request: Request, exc: HTTPException):
#     return JSONResponse(str(exc), status_code=400)

003 Custom Response

# router/product.py
from fastapi import APIRouter
from fastapi.responses import Response, HTMLResponse, PlainTextResponse

router = APIRouter(
    prefix='/product',
    tags=['product']
)

products = ['watch', 'camera', 'phone']


@router.get('/all')
def get_all_products():
    # return product
    data = " ".join(products)
    return Response(content=data, media_type="text/plain")


@router.get('/{id}', responses={
    # 让接口描述是指定的而不是默认的application/json
    200: {
        "content": {
            'text/html': {
                "<div>Product</div>"
            }
        },
        'description': 'Returns the HTML for an object'
    },
    404: {
        "content": {
            'text/plain': {
                "Product not available"
            }
        },
        'description': 'A clear text error message'
    }
})
def get_product(id: int):
    if id > len(products) - 1:
        out = 'Product not available'
        return PlainTextResponse(status_code=404, content=out, media_type='text/plain')
    else:
        product = products[id]
        out = f"""
        <head>
            <style>
            .product {{
                width: 500px;
                height: 30px;
                border: 2px inset green;
                background-color: lightblue;
                text-align: center;
            }}
            </style>
        <head>
        <div class="product">{product}</div> 
        """
        return HTMLResponse(content=out, media_type='text/html')

004 Headers

# router/product.py

@router.get('/withheader')
def get_products(
        response: Response,
        # custom_header: Optional[str] = Header(None)
        custom_header: Optional[List[str]] = Header(None)
):
    response.headers['X-Custom-Header'] = ', '.join(custom_header)
    return products

005 Cookies

# router/product.py 
from fastapi import APIRouter, Header, Cookie  

@router.get('/all')
def get_all_products():
    # return product
    data = " ".join(products)
    res = Response(content=data, media_type="text/plain")
    res.set_cookie(key='test_cookie', value='test')
    return res


@router.get('/withheader')
def get_products(
        response: Response,
        # custom_header: Optional[str] = Header(None)
        custom_header: Optional[List[str]] = Header(None),
        test_cookie: Optional[str] = Cookie(None) # 发请求时, 会自动从cookie里读取 test_cookie
):
    response.headers['X-Custom-Header'] = ', '.join(custom_header)
    return {
        'data': products,
        'custom_header': custom_header,
        'my_cookie': test_cookie
    }

006 Form data

pip install python-multipart
# router/product.py
from fastapi import APIRouter, Header, Cookie, Form

@router.post('/new')
def create_product(name: str = Form(...)):
    products.append(name)
    return products

007 CORS

# 7_concept/main.py
from fastapi.middleware.cors import CORSMiddleware

# 允许跨域
origins = [
    "http://localhost:3000/",
]

app.add_middleware(
    CORSMiddleware,
    # allow_origins=["*"],
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

09 - Authentication

001 Section overview

002 Authentication

003 Securing an endpoint

还没设置身份验证端点, 下节继续

# auth/oauth2.py
from fastapi.security import OAuth2PasswordBearer

# 身份验证令牌对应的变量名 token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="token")
# router/article.py
from auth.oauth2 import oauth2_schema

# get article
@router.get("/{id}", response_model=ArticleDisplay)
def get_article(id: int, db: Session = Depends(get_db), token: str = Depends(oauth2_schema)):
    return db_article.get_article(db, id)

005 Generating access token

# 生成 token key
openssl rand -hex 32 
pip install python-jose
# auth/authentication.py
from fastapi import APIRouter, HTTPException, status
from fastapi.param_functions import Depends
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm.session import Session
from db.database import get_db
from db.models import DbUser
from db.hash import Hash
from auth import oauth2

router = APIRouter(
    tags=["authentication"]
)


@router.post("/token")
def get_token(request: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = db.query(DbUser).filter(DbUser.username == request.username).first()
    if not user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid credentials")
    if not Hash.verify(user.password, request.password):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid password")

    access_token = oauth2.create_access_token(data={"sub": user.username})
    return {
        'access_token': access_token,
        'token_type': 'bearer',
        'user_id': user.id,
        'username': user.username,
    }
# main.py
from fastapi import FastAPI, Request
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from exceptions import StoryException
from router import blog_get, blog_post, user, article, product
from auth import authentication
from db import models
from db.database import engine

app = FastAPI()
# 导入路由
app.include_router(blog_get.router)
app.include_router(blog_post.router)
app.include_router(user.router)
app.include_router(article.router)
app.include_router(product.router)
app.include_router(authentication.router)
# auth/oauth2.py
from fastapi.security import OAuth2PasswordBearer
from typing import Optional
from datetime import timedelta, datetime
from jose import jwt

# 身份验证令牌对应的变量名 token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="token")

# openssl rand -hex 32 生成的随机字符串
SECRET_KEY = "0d41a5ae989119e68e2d2945a5e227721073e5fd2ff2e55fb7e9d70c4a064fd7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

点击接口右边的锁图标

填入账号密码

登录后再请求就有带token

006 User authentication

# auth/oauth2.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from typing import Optional
from datetime import timedelta, datetime
from jose import jwt, JWTError
from sqlalchemy.orm import Session
from db.database import get_db
from db import db_user

# ...


def get_current_user(token: str = Depends(oauth2_schema), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user =db_user.get_user_by_username(db, username)
    if user is None:
        raise credentials_exception

    return user
# router/article.py
from fastapi import APIRouter, Depends
from schemas import ArticleBase, ArticleDisplay, UserBase
from db.database import get_db
from db import db_article
from typing import List
from auth.oauth2 import oauth2_schema, get_current_user
from sqlalchemy.orm import Session

router = APIRouter(
    prefix="/article",
    tags=["article"]
)


# create article
@router.post("/", response_model=ArticleDisplay)
def create_article(request: ArticleBase, db: Session = Depends(get_db),
                   current_user: UserBase = Depends(get_current_user)):
    return db_article.create_article(db, request)


# get article
# @router.get("/{id}", response_model=ArticleDisplay)
# def get_article(id: int, db: Session = Depends(get_db), token: str = Depends(oauth2_schema)):
@router.get("/{id}")
def get_article(id: int, db: Session = Depends(get_db), current_user: UserBase = Depends(get_current_user)):
    return {
        'data': db_article.get_article(db, id),
        'current_user': current_user
    }
# db/db_user.py -> 操作表的函数
from fastapi import HTTPException, status
from sqlalchemy.orm.session import Session
from db.hash import Hash
from schemas import UserBase
from db.models import DbUser

# ...


def get_user_by_username(db: Session, username: str):
    user = db.query(DbUser).filter(DbUser.username == username).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with username {username} not found"
        )
    return user


# ...

10 - Working with files

001 Section overview

002 File

# router/file.py
from fastapi import APIRouter, File

router = APIRouter(
    prefix="/file",
    tags=["file"]
)


@router.post('/file')
def get_file(file: bytes = File(...)):
    content = file.decode('utf-8')
    lines = content.split('\n')
    return {'lines': lines}
# main.py
from fastapi import FastAPI, Request
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from exceptions import StoryException
from router import blog_get, blog_post, user, article, product, file
from auth import authentication
from db import models
from db.database import engine

app = FastAPI()
# 导入路由
app.include_router(blog_get.router)
app.include_router(blog_post.router)
app.include_router(user.router)
app.include_router(article.router)
app.include_router(product.router)
app.include_router(authentication.router)
app.include_router(file.router)

003 UploadFile

# router/file.py

@router.post('/upload')
def get_uploadfile(upload_file: UploadFile = File(...)):
    # 同名文件会覆盖, 所以可以使用随机名称替代filename保存
    path = f"files/{upload_file.filename}"
    with open(path, 'w+b') as buffer:
        # 保存本地
        shutil.copyfileobj(upload_file.file, buffer)

    return {
        'filename': path,
        'type': upload_file.content_type
    }

004 Making files statically available

pip install aiofiles
# main.py
from fastapi import FastAPI, Request
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from exceptions import StoryException
from router import blog_get, blog_post, user, article, product, file
from auth import authentication
from db import models
from db.database import engine
from fastapi.staticfiles import StaticFiles

# ...

# 静态文件
app.mount("/files", StaticFiles(directory="files"), name="files")

005 Downloading files

# router/file.py

@router.get('/download/{name}', response_class=FileResponse)
def get_file(name: str):
    path = f'files/{name}'
    return path

11 - Tasks

001 Section overview

002 Deployment

003 Debugging

004 Testing


  目录