FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?
在使用 FastAPI 构建 AI 对话应用时,StreamingResponse 是实现打字机效果的绝佳工具。通过 yield 逐步返回内容,用户体验非常流畅。但一个棘手的问题随之而来:如果用户在 AI 回答的过程中取消对话或中断了连接,后端会发生什么?我们如何确保对话记录等重要数据依然能被可靠地保存到数据库中?
问题出现过程
1. 客户端发起流式对话请求
我们从一个典型的流式对话接口开始。我们使用依赖注入来获取一个 SQLAlchemy 的 AsyncSession,在对话开始时创建消息,在对话结束后更新 AI 的回答。
流式对话原始代码(伪代码)- from fastapi import APIRouter, Depends
- from fastapi.responses import StreamingResponse
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.db import get_session # 依赖注入函数
- router = APIRouter()
- async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):
- # 模拟流式生成
- try:
- # 1.创建一对消息 query answer
- message_user_id = create_message(conversation_id, query, session)
- message_ai_id = create_message(conversation_id, "", seesion)
- # 2.ai对话
- full_response = ""
- for chunk in model.generate(user_input): # 假设这是你的 AI 模型
- yield chunk
- full_response += chunk
- except Exception as e:
- pass
- finally:
- # 更新answer消息
- async save_conversation(session, full_response)
- print("对话已保存。")
- async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):
- # 根据传来的session和message_ai_id 更新当前消息即可
- await session.commit()
- @router.post("/chat")
- async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):
- # get_session 单例
- generator = stream_chat_generator(user_input, conversation_id, session)
- return StreamingResponse(generator, media_type="text/event-stream")
复制代码 2. 客户端取消对话(主动断开)
当用户取消发送时,会抛出这个异常- pymysql.err.InterfaceError:
复制代码 原因:当客户端断开时 ,FastAPI 会立即把它的 session连接回收掉,底层的那个物理连接被标记为 Cancelled,然后执行finally的时候,再往下传原来session连接就不对了,save_conversation函数就会抛pymysql.err.InterfaceError。
问题解决尝试
尝试一:在 save_conversation 函数中创建新连接
一个自然的想法是:既然旧的 session 不能用了,那就在保存的时候检查一下,如果不可用就创建一个新的。
代码更新- from fastapi import APIRouter, Depends
- from fastapi.responses import StreamingResponse
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.db import get_session # 依赖注入函数
- router = APIRouter()
- async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):
- # 模拟流式生成
- try:
- # 1.创建一对消息 query answer
- message_user_id = create_message(conversation_id, query, session)
- message_ai_id = create_message(conversation_id, "", seesion)
- # 2.ai对话
- full_response = ""
- for chunk in model.generate(user_input): # 假设这是你的 AI 模型
- yield chunk
- full_response += chunk
- except Exception as e:
- pass
- finally:
- # 更新answer消息
- async save_conversation(session, full_response)
- print("对话已保存。")
- async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):
- # 根据传来的session和message_ai_id 更新当前消息即可
- # 1.先判断传入的连接是否可用
- is_pass = session.inspect(self.db).closed
- if is_pass:
- # 继续更新消息
- else:
- # 创建新连接
- async with AsyncSessionLocal() as session:
- # 继续更新消息 这又报错了⚠
- await session.commit()
- @router.post("/chat")
- async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):
- # get_session 单例
- generator = stream_chat_generator(user_input, conversation_id, session)
- return StreamingResponse(generator, media_type="text/event-stream")
复制代码 结果:失败! 没想到,即使创建了新的 session,依然抛出了 pymysql.err.InterfaceError。
原因分析:之所以还会抛错误,原因是这个新会话 依然在使用已经被取消的连接池资源,因为 FastAPI/Starlette 在主请求取消时,会把整个 AsyncSessionLocal() 对象的连接都标记为 “cancelled”。即便你重新 async with AsyncSessionLocal(),底层复用的还是同一个数据库连接池里的连接,而那个连接刚被 cancel。
重新创建个数据库引擎 是肯定可以的,但是只是对话后更新,这么搞完全没必要。
或者创建个独立线程,在新线程中去创建新连接,应该是可以的,个人还是感觉比较重,浪费资源。
尝试二:创建个协程去执行save_conversation
代码更新- from fastapi import APIRouter, Depends
- from fastapi.responses import StreamingResponse
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.db import get_session # 依赖注入函数
- router = APIRouter()
- async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):
- # 模拟流式生成
- try:
- # 1.创建一对消息 query answer
- message_user_id = create_message(conversation_id, query, session)
- message_ai_id = create_message(conversation_id, "", seesion)
- # 2.ai对话
- full_response = ""
- for chunk in model.generate(user_input): # 假设这是你的 AI 模型
- yield chunk
- full_response += chunk
- except Exception as e:
- pass
- finally:
- # 更新answer消息
- # 创建协程执行
- asyncio.create_task(save_conversation(session, full_response))
-
- print("对话已保存。")
- async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):
- # 根据传来的session和message_ai_id 更新当前消息即可
- # 1.先判断传入的连接是否可用
- is_pass = session.inspect(self.db).closed
- if is_pass:
- # 继续更新消息
- else:
- # 创建新连接
- async with AsyncSessionLocal() as session:
- # 继续更新消息 这又报错了⚠
- await session.commit()
- @router.post("/chat")
- async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):
- # get_session 单例
- generator = stream_chat_generator(user_input, conversation_id, session)
- return StreamingResponse(generator, media_type="text/event-stream")
复制代码 疑惑点:asyncio.create_task 启动的协程仍然跑在同一个线程和进程里,也会复用那个全局的连接池,理论上确实还有可能拿到刚才那个被 cancel 的连接啊?
主要在于操作的时序和上下文隔离:
- 先清理,后执行
当原始请求被取消后,FastAPI 会立即开始清理与该请求相关的资源(包括回收它持有的数据库连接)。这个清理动作在 finally 块中调用 create_task 之前就已经触发了。我们派生出的后台任务是在这个清理逻辑之后才启动的。
- 上下文隔离
这个后台协程已经完全不挂在 HTTP 请求的上下文上了。客户端断开与否,都影响不了它的独立运行。只要连接池中还有任意一个好连接,它就能完成写入。
- 高成功率
因为顺序已经变成了:先断开、先清理 → 再新建、再执行,所以新任务向连接池请求时,拿到那个“坏掉”连接的概率已经大大降低。连接池会优先分配一个健康的、空闲的连接。
即使在极端情况下又拿到了旧连接,它也很有可能在 session.begin() 阶段就失败,我们还可以在后台任务的 try...except 块里加入重试逻辑(比如 await asyncio.sleep(0.1) 后重试),进一步提高健壮性。
大量测试后,发现真没问题
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |