借助FastAPI与RBAC构建固若金汤的安全体系

2个月前发布 gsjqwyl
15 0 0

利用FastAPI与RBAC构建严密的安全架构

cmdragon_cn.png
cmdragon_cn.png

一、FastAPI安全认证与RBAC系统原理

1.1 RBAC基础理念

基于角色的访问控制(RBAC)是守护系统资源的经典模式,其核心包含三个关键构成:
用户:是系统中进行操作的主体。
角色:作为权限的集合载体,例如管理员角色、普通用户角色等。
权限:指具体的操作权限,像商品删除权限、订单修改权限等。

1.2 FastAPI安全组件

核心依赖的软件包及版本要求如下:

fastapi == 0.95.2
uvicorn == 0.22.0
python-jose[cryptography] == 3.3.0
passlib[bcrypt] == 1.7.4

安装命令:

pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]

二、电商平台RBAC系统搭建

2.1 数据模型定义

借助Pydantic和SQLAlchemy来构建领域模型:

from pydantic import BaseModel
from sqlalchemy import Column, Integer, String, Table, ForeignKey

class PermissionBase(BaseModel):
    code: str  # 权限编码,例如 "order:delete"
    description: str

class RoleCreate(BaseModel):
    name: str
    permission_ids: list[int]

# SQLAlchemy模型
user_role = Table(
    'user_role', Base.metadata,
    Column('user_id', ForeignKey('users.id')),
    Column('role_id', ForeignKey('roles.id'))
)

role_permission = Table(
    'role_permission', Base.metadata,
    Column('role_id', ForeignKey('roles.id')),
    Column('permission_id', ForeignKey('permissions.id'))
)

2.2 认证系统实现

JWT令牌的生成与验证:

from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_access_token(data: dict) -> str:
    expire = datetime.utcnow() + timedelta(hours=1)
    return jwt.encode(
        {**data, "exp": expire},
        SECRET_KEY,
        algorithm=ALGORITHM
    )

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    try:
        payload = jwt.decode(token, SECRET_KEY, [ALGORITHM])
        return await UserService.get_user(payload["sub"])
    except JWTError:
        raise HTTPException(401, "Invalid token")

2.3 权限校验系统

动态权限的依赖注入实现:

def check_permission(permission_code: str):
    async def _permission_checker(
            user: User = Depends(get_current_user)
    ):
        if not any(p.code == permission_code for p in user.permissions):
            raise HTTPException(403, "Permission denied")
    return Depends(_permission_checker)

# 在路由中使用
@app.delete("/orders/{order_id}")
async def delete_order(
        order_id: str,
        _=check_permission("order:delete")
):
    return OrderService.delete_order(order_id)

三、安全防护策略

3.1 敏感数据加密

密码存储的安全处理方式:

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

3.2 请求速率限制

防止暴力破解的请求速率限制措施:

from fastapi import Request
from fastapi.middleware import Middleware

class RateLimiter:
    def __init__(self, times: int, seconds: int):
        self.times = times
        self.seconds = seconds

    async def __call__(self, request: Request):
        client_ip = request.client.host
        # 使用Redis记录请求次数
        current = await redis.incr(client_ip)
        if current > self.times:
            raise HTTPException(429, "Too many requests")
        await redis.expire(client_ip, self.seconds)

四、测试与调试

4.1 单元测试示例

权限验证的测试案例:

from fastapi.testclient import TestClient

def test_admin_access():
    client = TestClient(app)
    # 普通用户令牌
    token = create_test_token(user_type="user")
    response = client.delete(
        "/products/123",
        headers={"Authorization": f"Bearer {token}"}
    )
    assert response.status_code == 403

    # 管理员令牌
    admin_token = create_test_token(user_type="admin")
    response = client.delete(
        "/products/123",
        headers={"Authorization": f"Bearer {admin_token}"}
    )
    assert response.status_code == 200

4.2 常见错误处理

  • 错误1:401未授权
    json
    {
    "detail": "Could not validate credentials"
    }

    解决办法:
  • 检查请求头Authorization的格式是否正确。
  • 确认令牌是否已过期。
  • 验证密钥是否匹配。

  • 错误2:403权限不足
    json
    {
    "detail": "Permission denied"
    }

    预防建议:

  • 在路由处理器前添加权限检查中间件。
  • 定期审查角色与权限的分配情况。

课后测验

  • 问题1:为何要在密码哈希中使用salt?
    答案:salt随机值能够有效抵御彩虹表攻击,即便密码相同,也会生成不同的哈希值。
  • 问题2:如何实现动态权限管理?
    答案:应当构建权限管理系统的接口,允许管理员动态地对角色和权限的对应关系进行配置。
  • 问题3:为何要给JWT令牌设置过期时间?
    答案:缩短令牌的有效期能够降低令牌泄露所带来的风险,建议结合refresh token一同使用。

版本兼容说明

所有代码在以下环境下验证通过:
– Python 3.8及以上版本
– FastAPI 0.95及以上版本
– SQLAlchemy 1.4及以上版本
– Redis 6.2及以上版本

余下文章内容可点击跳转至个人博客页面,或扫码关注微信公众号“编程智域 前端至全栈交流与成长”,阅读完整文章:如何用 FastAPI 和 RBAC 打造坚不可摧的电商堡垒?

往期文章归档:

© 版权声明

相关文章

没有相关内容!

暂无评论

none
暂无评论...