找回密码
 立即注册
首页 业界区 业界 FastAPI 流式响应中,如何优雅处理客户端断连后的数据库 ...

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

梢疠 2025-11-25 19:05:01
FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

在使用 FastAPI 构建 AI 对话应用时,StreamingResponse 是实现打字机效果的绝佳工具。通过 yield 逐步返回内容,用户体验非常流畅。但一个棘手的问题随之而来:如果用户在 AI 回答的过程中取消对话或中断了连接,后端会发生什么?我们如何确保对话记录等重要数据依然能被可靠地保存到数据库中?
问题出现过程

1. 客户端发起流式对话请求

我们从一个典型的流式对话接口开始。我们使用依赖注入来获取一个 SQLAlchemy 的 AsyncSession,在对话开始时创建消息,在对话结束后更新 AI 的回答。
流式对话原始代码(伪代码)
  1. from fastapi import APIRouter, Depends
  2. from fastapi.responses import StreamingResponse
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.db import get_session # 依赖注入函数
  5. router = APIRouter()
  6. async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):
  7.     # 模拟流式生成
  8.     try:
  9.       # 1.创建一对消息 query answer
  10.       message_user_id = create_message(conversation_id, query, session)
  11.       message_ai_id = create_message(conversation_id, "", seesion)
  12.       # 2.ai对话
  13.       full_response = ""
  14.       for chunk in model.generate(user_input): # 假设这是你的 AI 模型
  15.           yield chunk
  16.           full_response += chunk
  17.     except Exception as e:
  18.       pass
  19.     finally:
  20.       # 更新answer消息
  21.       async save_conversation(session, full_response)
  22.       print("对话已保存。")
  23. async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):
  24.     # 根据传来的session和message_ai_id 更新当前消息即可
  25.     await session.commit()
  26. @router.post("/chat")
  27. async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):
  28.     # get_session 单例
  29.     generator = stream_chat_generator(user_input, conversation_id, session)
  30.     return StreamingResponse(generator, media_type="text/event-stream")
复制代码
2. 客户端取消对话(主动断开)

当用户取消发送时,会抛出这个异常
  1. pymysql.err.InterfaceError:
复制代码
原因:当客户端断开时 ,FastAPI 会立即把它的 session连接回收掉,底层的那个物理连接被标记为 Cancelled,然后执行finally的时候,再往下传原来session连接就不对了,save_conversation函数就会抛pymysql.err.InterfaceError。
问题解决尝试

尝试一:在 save_conversation 函数中创建新连接

一个自然的想法是:既然旧的 session 不能用了,那就在保存的时候检查一下,如果不可用就创建一个新的。
代码更新
  1. from fastapi import APIRouter, Depends
  2. from fastapi.responses import StreamingResponse
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.db import get_session # 依赖注入函数
  5. router = APIRouter()
  6. async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):
  7.     # 模拟流式生成
  8.     try:
  9.       # 1.创建一对消息 query answer
  10.       message_user_id = create_message(conversation_id, query, session)
  11.       message_ai_id = create_message(conversation_id, "", seesion)
  12.       # 2.ai对话
  13.       full_response = ""
  14.       for chunk in model.generate(user_input): # 假设这是你的 AI 模型
  15.           yield chunk
  16.           full_response += chunk
  17.     except Exception as e:
  18.       pass
  19.     finally:
  20.       # 更新answer消息
  21.       async save_conversation(session, full_response)
  22.       print("对话已保存。")
  23. async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):
  24.     # 根据传来的session和message_ai_id 更新当前消息即可
  25.     # 1.先判断传入的连接是否可用
  26.     is_pass = session.inspect(self.db).closed
  27.     if is_pass:
  28.       # 继续更新消息
  29.     else:
  30.       # 创建新连接  
  31.       async with AsyncSessionLocal() as session:
  32.         # 继续更新消息  这又报错了⚠
  33.     await session.commit()
  34. @router.post("/chat")
  35. async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):
  36.     # get_session 单例
  37.     generator = stream_chat_generator(user_input, conversation_id, session)
  38.     return StreamingResponse(generator, media_type="text/event-stream")
复制代码
结果:失败! 没想到,即使创建了新的 session,依然抛出了 pymysql.err.InterfaceError。
原因分析:之所以还会抛错误,原因是这个新会话 依然在使用已经被取消的连接池资源,因为 FastAPI/Starlette 在主请求取消时,会把整个 AsyncSessionLocal() 对象的连接都标记为 “cancelled”。即便你重新 async with AsyncSessionLocal(),底层复用的还是同一个数据库连接池里的连接,而那个连接刚被 cancel。
重新创建个数据库引擎 是肯定可以的,但是只是对话后更新,这么搞完全没必要。
或者创建个独立线程,在新线程中去创建新连接,应该是可以的,个人还是感觉比较重,浪费资源。
尝试二:创建个协程去执行save_conversation

代码更新
  1. from fastapi import APIRouter, Depends
  2. from fastapi.responses import StreamingResponse
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.db import get_session # 依赖注入函数
  5. router = APIRouter()
  6. async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):
  7.     # 模拟流式生成
  8.     try:
  9.       # 1.创建一对消息 query answer
  10.       message_user_id = create_message(conversation_id, query, session)
  11.       message_ai_id = create_message(conversation_id, "", seesion)
  12.       # 2.ai对话
  13.       full_response = ""
  14.       for chunk in model.generate(user_input): # 假设这是你的 AI 模型
  15.           yield chunk
  16.           full_response += chunk
  17.     except Exception as e:
  18.       pass
  19.     finally:
  20.       # 更新answer消息
  21.       # 创建协程执行
  22.       asyncio.create_task(save_conversation(session, full_response))
  23.       
  24.       print("对话已保存。")
  25. async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):
  26.     # 根据传来的session和message_ai_id 更新当前消息即可
  27.     # 1.先判断传入的连接是否可用
  28.     is_pass = session.inspect(self.db).closed
  29.     if is_pass:
  30.       # 继续更新消息
  31.     else:
  32.       # 创建新连接  
  33.       async with AsyncSessionLocal() as session:
  34.         # 继续更新消息  这又报错了⚠
  35.     await session.commit()
  36. @router.post("/chat")
  37. async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):
  38.     # get_session 单例
  39.     generator = stream_chat_generator(user_input, conversation_id, session)
  40.     return StreamingResponse(generator, media_type="text/event-stream")
复制代码
疑惑点:asyncio.create_task 启动的协程仍然跑在同一个线程和进程里,也会复用那个全局的连接池,理论上确实还有可能拿到刚才那个被 cancel 的连接啊?
主要在于操作的时序和上下文隔离

  • 先清理,后执行
    当原始请求被取消后,FastAPI 会立即开始清理与该请求相关的资源(包括回收它持有的数据库连接)。这个清理动作在 finally 块中调用 create_task 之前就已经触发了。我们派生出的后台任务是在这个清理逻辑之后才启动的。

  • 上下文隔离
    这个后台协程已经完全不挂在 HTTP 请求的上下文上了。客户端断开与否,都影响不了它的独立运行。只要连接池中还有任意一个好连接,它就能完成写入。

  • 高成功率
    因为顺序已经变成了:先断开、先清理 → 再新建、再执行,所以新任务向连接池请求时,拿到那个“坏掉”连接的概率已经大大降低。连接池会优先分配一个健康的、空闲的连接。

即使在极端情况下又拿到了旧连接,它也很有可能在 session.begin() 阶段就失败,我们还可以在后台任务的 try...except 块里加入重试逻辑(比如 await asyncio.sleep(0.1) 后重试),进一步提高健壮性。

大量测试后,发现真没问题
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册