FastAPI中WebSocket消息处理的创新玩法

3周前发布 gsjqwyl
15 0 0

标题:FastAPI中WebSocket消息交互的新颖操作

一、文本消息的接收与发送

运行环境要求:Python 3.8及以上版本。安装依赖命令为:pip install fastapi==0.68.0 uvicorn==0.15.0 websockets==10.3 pydantic==1.10.7

from fastapi import FastAPI, WebSocket
import datetime

app = FastAPI()

@app.websocket("/ws/chat")
async def handle_websocket_chat(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 接收客户端发送的文本消息
            client_msg_content = await websocket.receive_text()
            # 对消息进行处理(示例:添加时间戳)
            server_reply = f"[{datetime.now()}] Server received: {client_msg_content}"
            # 向客户端发送响应文本
            await websocket.send_text(server_reply)
    except WebSocketDisconnect:
        print("客户端已断开连接")

消息处理流程简述:

  1. 客户端通过ws://协议建立WebSocket连接
  2. 服务端调用await websocket.accept()接受连接请求
  3. 进入循环接收并处理receive_text()send_text()
  4. 异常发生时自动断开连接

序列图展示:

participant Client as 客户端
participant Server as 服务端
Client->>Server: 建立WebSocket连接(ws://)
activate Server
Server->>Server: 调用await websocket.accept()接受连接
activate Client
loop 消息收发
    Client->>Server: receive_text()
    Server-->>Client: send_text()
end
alt 异常处理
    Server->>Client: 自动断开连接
end
deactivate Server
deactivate Client

应用场景:适用于实时聊天室、协同编辑系统、实时日志监控等场景

二、二进制数据的传输处理

@app.websocket("/ws/file-transfer")
async def handle_websocket_file(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 接收客户端发送的二进制数据
            binary_data = await websocket.receive_bytes()
            # 示例:保存接收到的二进制数据到文件
            with open("received_file.bin", "wb") as file_obj:
                file_obj.write(binary_data)
            # 向客户端发送确认消息
            await websocket.send_bytes(b"FILE_RECEIVED")
    except WebSocketDisconnect:
        print("文件传输中断")

二进制处理要点:

  • 使用receive_bytes()send_bytes()方法进行数据传输
  • 适合传输图片、音频、视频等二进制格式文件
  • 建议对大文件进行分块传输(结合自定义消息头协议)

三、JSON消息的序列化与自动解析

from pydantic import BaseModel

class MessageStructure(BaseModel):
    user: str
    content: str
    timestamp: float

@app.websocket("/ws/json-demo")
async def handle_websocket_json(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            json_data = await websocket.receive_json()
            # 通过Pydantic模型进行数据验证
            validated_message = MessageStructure(**json_data)
            # 进行业务逻辑处理
            processed_info = validated_message.dict()
            processed_info["status"] = "PROCESSED"
            # 返回处理结果给客户端
            await websocket.send_json(processed_info)
    except ValidationError as err:
        await websocket.send_json({"error": str(err)})

自动验证流程:

  1. 接收原始JSON格式数据
  2. 通过Pydantic模型进行数据清洗
  3. 自动进行类型转换和字段验证
  4. 结构化错误信息返回给客户端

四、消息接收循环与超时控制

from websockets.exceptions import ConnectionClosed
import asyncio

@app.websocket("/ws/with-timeout")
async def handle_websocket_timeout(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            try:
                # 设置单次接收超时时间为10秒
                data = await asyncio.wait_for(websocket.receive_text(), timeout=10)
                await process_received_message(data)
            except asyncio.TimeoutError:
                # 发送心跳包保持连接
                await websocket.send_text("HEARTBEAT")
    except ConnectionClosed:
        print("连接正常关闭")

超时控制策略:

  • 使用asyncio.wait_for设置单次接收超时时间
  • 定期发送心跳包维持连接状态
  • 分类处理不同类型的异常(正常关闭/异常断开)

课后小测试

Q1:如何处理同时接收文本和二进制消息的场景?
A:通过receive()方法判断消息类型进行处理:

message = await websocket.receive()
if message["type"] == "websocket.receive.text":
    handle_text_message(message["text"])
elif message["type"] == "websocket.receive.bytes":
    handle_binary_message(message["bytes"])

Q2:为何推荐使用Pydantic进行JSON验证?
A:① 自动进行类型转换 ② 严格检查字段约束 ③ 抵御无效数据入侵 ④ 便于生成API文档

常见报错解决方案

422 Validation Error

{
  "detail": [
    {
      "loc": [
        "body",
        "timestamp"
      ],
      "msg": "field required",
      "type": "value_error.missing"
    }
  ]
}

解决办法:
1. 检查客户端发送的JSON字段是否完整
2. 确认时间戳是否为数字类型
3. 可以为字段设置默认值,例如timestamp: float = None

WebSocketTimeoutException

  • 成因:长时间未进行消息收发操作
  • 解决:调整wait_for的超时参数,添加心跳机制维持连接

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转WebSocket消息处理?

往期文章归档:

免费好用的热门在线工具

© 版权声明

相关文章

没有相关内容!

暂无评论

none
暂无评论...