前言
在之前的案例中,会话历史都保存在内存中,一旦程序重启,会话历史就会丢失,导致AI没法根据会话历史推测用户当前意图。这就像一个人记性特别差,每次见面都忘了之前聊过什么,让人感觉很不专业。
如果你之前使用过LangGraph,那应该知道LangGraph提供了checkpointer功能来管理历史会话,可以把历史会话保存到关系型数据库或内存中。虽然LangGraph有这个功能,但是MCP官方文档中并没有提类似的实现。不过没关系,加载会话的逻辑其实很简单,我们也可以实现一个简单的持久化记忆功能。
我这里设计的是把历史会话保存到sqlite中,跟LangGraph一样,根据thread_id查找历史会话。我这里只是把会话当字符串来保存,而LangGraph则做了特殊处理,所以功能和性能方面相对差点,但基本功能都已经实现了,而且还另外加了时间字段。如果数据库更换为MySQL或Postgres,可以按时间字段来分区,这样就能更好地管理历史数据了。
MCP Server
有的持久化记忆的设计是把历史会话管理功能集成到MCP Server上,但我认为历史会话跟Client是强相关的,所以MCP Server端我没做任何改动,和前面的示例基本一致。这样设计的好处是保持了Server的简洁性,让每个组件各司其职。- import asyncio
- from dataclasses import dataclass
- from datetime import datetime
- from fastmcp import Context, FastMCP
- @dataclass
- class UserDecision:
- decision: str = "decline"
- mcp = FastMCP("Elicitation Server")
- @mcp.tool()
- async def get_weather(city: str) -> str:
- """Get weather for a given city."""
- return f"It's always sunny in {city}!"
- @mcp.tool()
- async def get_date() -> str:
- """Get today's date."""
- return datetime.now().strftime("%Y-%m-%d")
- @mcp.tool()
- async def execute_command_local(command: str, ctx: Context, is_need_user_check: bool = False, timeout: int = 10) -> str:
- """Execute a shell command locally.
-
- Args:
- command (str): The shell command to execute.
- is_need_user_check (bool): Set to True when performing create, delete, or modify operations on the host, indicating that user confirmation is required.
- timeout (int): Timeout in seconds for command execution. Default is 10 seconds.
- Returns:
- str: The output of the shell command.
- """
- if is_need_user_check:
- user_check_result = await ctx.elicit(
- message=f"Do you want to execute this command(yes or no): {command}",
- response_type=UserDecision, # response_type 必须是符合 JSON Schema
- )
- if user_check_result.action != "accept":
- return "User denied command execution."
- try:
- proc = await asyncio.create_subprocess_shell(
- command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE
- )
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
- stdout_str = stdout.decode().strip()
- stderr_str = stderr.decode().strip()
- if stdout_str:
- return f"Stdout: {stdout_str}"
- elif stderr_str:
- return f"Stderr: {stderr_str}"
- else:
- return "Command executed successfully with no output"
- except asyncio.TimeoutError:
- if proc and not proc.returncode:
- try:
- proc.terminate()
- await proc.wait()
- except:
- pass
- return f"Error: Command '{command}' timed out after {timeout} seconds"
- except Exception as e:
- return f"Error executing command '{command}': {str(e)}"
-
- if __name__ == "__main__":
- mcp.run(transport="streamable-http", host="localhost", port=8001, show_banner=False)
复制代码 client使用http通信,所以server记得要先运行。
MCP Client
在客户端,我们增加了对sqlite的异步使用支持,将每次对话按thread_id保存到sqlite中。这就像是给AI装上了"记忆硬盘",让它能够记住之前的对话内容。- import asyncio
- import json
- import readline # For enhanced input editing
- import traceback
- from collections.abc import Sequence
- from datetime import datetime
- from typing import Dict, List, Optional, cast
- from fastmcp import Client
- from fastmcp.client.elicitation import ElicitResult
- from mcp.shared.context import RequestContext
- from mcp.types import ElicitRequestParams
- from openai import AsyncOpenAI
- from openai.types.chat import ChatCompletionMessageFunctionToolCall
- from sqlalchemy import (Boolean, Column, DateTime, Integer, String, Text, func,
- select)
- from sqlalchemy.ext.asyncio import (AsyncSession, async_sessionmaker,
- create_async_engine)
- from sqlalchemy.orm import declarative_base, sessionmaker
- from pkg.config import cfg
- from pkg.log import logger
- db_engine = create_async_engine("sqlite+aiosqlite:///aiodemo.sqlite")
- Base = declarative_base()
- session_local = async_sessionmaker(bind=db_engine, expire_on_commit=False, class_=AsyncSession)
- class MCPMemory(Base):
- __tablename__ = "mcp_memory"
- id = Column(Integer, primary_key=True, autoincrement=True)
- thread_id = Column(String(20), index=True, comment="client gets memories according to thread_id")
- role = Column(String(20), comment="system/user/assistant/tool")
- tool_calls = Column(Text)
- tool_call_id = Column(String(50))
- content = Column(Text)
- is_deleted = Column(Boolean, default=False, comment="logical deletion")
- created_at = Column(DateTime, default=func.now())
- updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
- deleted_at = Column(DateTime, default=None)
- def __repr__(self):
- return f"<MCPMemory(id={self.id}, thread_id={self.thread_id}, role={self.role}, content={self.content}, is_deleted={self.is_deleted}, created_at={self.created_at}, updated_at={self.updated_at})>"
- class MCPMemoryService:
- @classmethod
- async def get_memories(cls, thread_id: str) -> Sequence[MCPMemory]:
- async with session_local() as session:
- results = await session.execute(select(MCPMemory).where(MCPMemory.thread_id == thread_id, MCPMemory.is_deleted == False).order_by(MCPMemory.created_at))
- return results.scalars().fetchall()
-
- @classmethod
- async def add_memory(cls, thread_id: str, role: str, content: Optional[str] = None, tool_calls: Optional[str] = None, tool_call_id: Optional[str] = None):
- async with session_local() as session:
- memory = MCPMemory(thread_id=thread_id, role=role, content=content, tool_calls=tool_calls, tool_call_id=tool_call_id)
- session.add(memory)
- await session.commit()
- await session.refresh(memory)
- return memory
- async def init_db():
- async with db_engine.begin() as conn:
- await conn.run_sync(Base.metadata.create_all)
- class MCPHost:
- """MCP主机类,用于管理与MCP服务器的连接和交互"""
-
- def __init__(self, server_uri: str, thread_id: str):
- """
- 初始化MCP客户端
-
- Args:
- server_uri (str): MCP服务器的URI地址
- """
- # 初始化MCP客户端连接
- self.mcp_client: Client = Client(server_uri, elicitation_handler=self.elicitation_handler)
- # 初始化异步OpenAI客户端用于与LLM交互
- self.llm = AsyncOpenAI(
- base_url=cfg.llm_base_url,
- api_key=cfg.llm_api_key,
- )
- self.thread_id = thread_id
- # 存储对话历史消息
- self.messages = []
- async def close(self):
- """关闭MCP客户端连接"""
- if self.mcp_client:
- await self.mcp_client.close()
- async def load_memory(self):
- """从数据库加载历史对话记录"""
- results = await MCPMemoryService.get_memories(self.thread_id)
- if results:
- for rst in results:
- # 处理工具调用消息
- if rst.tool_calls is not None:
- try:
- tool_calls = json.loads(str(rst.tool_calls))
- self.messages.append({
- "role": rst.role,
- "tool_calls": tool_calls
- })
- except json.JSONDecodeError:
- logger.error(f"Failed to decode tool_calls JSON: {rst.tool_calls}")
- # 处理工具调用结果消息
- elif rst.tool_call_id is not None:
- self.messages.append({
- "role": "tool",
- "tool_call_id": str(rst.tool_call_id),
- "content": str(rst.content) if rst.content is not None else ""
- })
- # 处理普通消息
- else:
- self.messages.append({
- "role": rst.role,
- "content": str(rst.content) if rst.content is not None else ""
- })
- logger.info(f"Loaded {len(results)} messages from memory")
- async def elicitation_handler(self, message: str, response_type: type, params: ElicitRequestParams, context: RequestContext):
- print(f"MCP Server asks: {message}")
- user_decision = input("Please check(yes or no): ").strip()
- if not user_decision or user_decision != "yes":
- return ElicitResult(action="decline")
-
- response_data = response_type(decision="accept")
- return response_data
- async def process_query(self, query: str) -> str:
- """Process a user query by interacting with the MCP server and LLM.
-
- Args:
- query (str): The user query to process.
- Returns:
- str: The response from the MCP server.
- """
- # 将用户查询添加到消息历史中
- self.messages.append({
- "role": "user",
- "content": query,
- })
- await MCPMemoryService.add_memory(thread_id=self.thread_id, role="user", content=query)
- # 使用异步上下文管理器确保MCP客户端连接正确建立和关闭
- async with self.mcp_client:
- # 从MCP服务器获取可用工具列表
- tools = await self.mcp_client.list_tools()
- # 构造LLM可以理解的工具格式
- available_tools = []
- # 将MCP工具转换为OpenAI格式
- for tool in tools:
- available_tools.append({
- "type": "function",
- "function": {
- "name": tool.name,
- "description": tool.description,
- "parameters": tool.inputSchema,
- }
- })
- logger.info(f"Available tools: {[tool['function']['name'] for tool in available_tools]}")
- # 调用LLM,传入对话历史和可用工具
- resp = await self.llm.chat.completions.create(
- model=cfg.llm_model,
- messages=self.messages,
- tools=available_tools,
- temperature=0.3,
- )
- # 存储最终响应文本
- final_text = []
- # 获取LLM的首个响应消息
- message = resp.choices[0].message
- # 如果响应包含直接内容,则添加到结果中
- if hasattr(message, "content") and message.content:
- final_text.append(message.content)
- # 循环处理工具调用,直到没有更多工具调用为止
- while message.tool_calls:
- # 遍历所有工具调用
- for tool_call in message.tool_calls:
- # 确保工具调用有函数信息
- if not hasattr(tool_call, "function"):
- continue
- # 类型转换以获取函数调用详情
- function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call)
- function = function_call.function
- tool_name = function.name
- # 解析函数参数
- tool_args = json.loads(function.arguments)
- # 检查MCP客户端是否已连接
- if not self.mcp_client.is_connected():
- raise RuntimeError("Session not initialized. Cannot call tool.")
-
- # 调用MCP服务器上的指定工具
- result = await self.mcp_client.call_tool(tool_name, tool_args)
- # 将助手的工具调用添加到消息历史中
- self.messages.append({
- "role": "assistant",
- "tool_calls": [
- {
- "id": tool_call.id,
- "type": "function",
- "function": {
- "name": function.name,
- "arguments": function.arguments
- }
- }
- ]
- })
- await MCPMemoryService.add_memory(
- thread_id=self.thread_id,
- role="assistant",
- tool_calls=json.dumps([
- {
- "id": tool_call.id,
- "type": "function",
- "function": {
- "name": function.name,
- "arguments": function.arguments
- }
- }
- ])
- )
- # 将工具调用结果添加到消息历史中
- self.messages.append({
- "role": "tool",
- "tool_call_id":tool_call.id,
- "content": str(result.content) if result.content else ""
- })
- await MCPMemoryService.add_memory(
- thread_id=self.thread_id,
- role="tool",
- tool_call_id=tool_call.id,
- content=str(result.content) if result.content else "",
- )
-
- # 基于工具调用结果再次调用LLM
- final_resp = await self.llm.chat.completions.create(
- model=cfg.llm_model,
- messages=self.messages,
- tools=available_tools,
- temperature=0.3,
- )
- # 更新消息为最新的LLM响应
- message = final_resp.choices[0].message
- # 如果响应包含内容,则添加到最终结果中
- if message.content:
- final_text.append(message.content)
-
- # 将最终响应添加到消息历史中
- if final_text:
- final_content = "\n".join(final_text)
- self.messages.append({
- "role": "assistant",
- "content": final_content
- })
- await MCPMemoryService.add_memory(
- thread_id=self.thread_id,
- role="assistant",
- content=final_content
- )
-
- # 返回连接后的完整响应
- return "\n".join(final_text)
- async def chat_loop(self):
- """主聊天循环,处理用户输入并显示响应"""
- print("Welcome to the MCP chat! Type 'quit' to exit.")
- # 加载历史记忆
- await self.load_memory()
- if self.messages:
- print(f"Loaded {len(self.messages)} historical messages")
-
- # 显示历史对话
- for msg in self.messages:
- if msg["role"] == "user":
- print(f"You (history): {msg['content']}")
- elif msg["role"] == "assistant":
- if "content" in msg:
- print(f"Assistant (history): {msg['content']}")
- # 持续处理用户输入直到用户退出
- while True:
- try:
- # 获取用户输入
- query = input("You: ").strip()
- # 检查退出命令
- if query.lower() == "quit":
- print("Exiting chat. Goodbye!")
- break
- # 跳过空输入
- if not query:
- continue
- # 处理用户查询并获取响应
- resp = await self.process_query(query)
- print(f"Assistant: {resp}")
-
- # 捕获并记录聊天循环中的任何异常
- except Exception as e:
- logger.error(f"Error in chat loop: {str(e)}")
- logger.error(traceback.format_exc())
- async def main():
- """主函数,程序入口点"""
- # 创建MCP主机实例
- await init_db()
- client = MCPHost(server_uri="http://localhost:8001/mcp", thread_id="1234")
- try:
- # 启动聊天循环
- await client.chat_loop()
- except Exception as e:
- # 记录主程序中的任何异常
- logger.error(f"Error in main: {str(e)}")
- logger.error(traceback.format_exc())
- finally:
- # 确保客户端连接被正确关闭
- await client.close()
-
- if __name__ == "__main__":
- # 运行主程序
- asyncio.run(main())
复制代码 Client 运行输出示例,第一次运行- python demo08-client.py
- Welcome to the MCP chat! Type 'quit' to exit.
- You: 今天的日期是什么
- Assistant: 今天的日期是2025年9月14日。
- You: 明天呢
- Assistant: 今天是2025年9月14日,明天是2025年9月15日。
- You: 合肥的天气怎么样
- Assistant: 合肥的天气总是阳光明媚!
- You: quit
- Exiting chat. Goodbye!
复制代码 第二次运行输出示例,可以看到AI连接上了历史会话。- Welcome to the MCP chat! Type 'quit' to exit.
- Loaded 12 historical messages
- You (history): 今天的日期是什么
- Assistant (history): 今天的日期是2025年9月14日。
- You (history): 明天呢
- Assistant (history): 今天是2025年9月14日,明天是2025年9月15日。
- You (history): 合肥的天气怎么样
- Assistant (history): 合肥的天气总是阳光明媚!
- You: 南京呢?
- Assistant: 南京的天气也总是阳光明媚!
- You: quit
- Exiting chat. Goodbye!
复制代码 第三次运行输出示例,使用tool、elicitation也是正常的。- Welcome to the MCP chat! Type 'quit' to exit.
- Loaded 16 historical messages
- You (history): 今天的日期是什么
- Assistant (history): 今天的日期是2025年9月14日。
- You (history): 明天呢
- Assistant (history): 今天是2025年9月14日,明天是2025年9月15日。
- You (history): 合肥的天气怎么样
- Assistant (history): 合肥的天气总是阳光明媚!
- You (history): 南京呢?
- Assistant (history): 南京的天气也总是阳光明媚!
- You: 查询下内存使用情况
- Assistant: 当前内存使用情况如下:
- - **总内存**: 62Gi
- - **已使用内存**: 16Gi
- - **空闲内存**: 38Gi
- - **共享内存**: 161Mi
- - **缓存/缓冲区**: 8.7Gi
- - **可用内存**: 46Gi
- 交换分区使用情况:
- - **总交换空间**: 3.8Gi
- - **已使用交换空间**: 0B
- - **空闲交换空间**: 3.8Gi
- 整体来看,内存使用情况良好,仍有大量可用内存。
- You: 在家目录生成一个txt文件,文件名为当前日期,内容为当前内存使用情况和合肥今日的天气情况
- MCP Server asks: Do you want to execute this command(yes or no): echo "内存使用情况:\ntotal used free shared buff/cache available\n内存: 62Gi 16Gi 38Gi 161Mi 8.7Gi 46Gi\n交换: 3.8Gi 0B 3.8Gi\n\n合肥的天气:\nIt's always sunny in 合肥!" > ~/2025-09-14.txt
- Please check(yes or no): yes
- Assistant: 已在家目录生成文件 `2025-09-14.txt`,文件内容包含当前内存使用情况和合肥今日的天气情况。
- You: quit
- Exiting chat. Goodbye!
复制代码 小结
通过这个示例,我们实现了一个完整的MCP持久化记忆系统,让AI具备了"记住"对话历史的能力。这个系统的核心优势包括:
- 记忆持久化:对话历史保存在SQLite数据库中,即使程序重启也不会丢失,就像给AI安装了一个可靠的"记忆硬盘"。
- 线程隔离:通过thread_id区分不同用户的对话历史,确保每个用户的记忆都是独立的,互不干扰。
- 完整记录:不仅记录普通对话,还完整保存了工具调用和执行结果,确保AI能够完整理解上下文。
- 时间追踪:每条记忆都带有时间戳,方便后续按时间维度进行分析和管理。
- 灵活扩展:数据库设计支持逻辑删除和时间分区,为后续扩展更多功能奠定了基础。
这个持久化记忆系统的实现,使得MCP应用能够提供更加连贯和智能的用户体验。用户不再需要每次都重复之前的上下文信息,AI也能基于历史对话提供更加个性化和精准的服务。
在实际应用中,这种持久化记忆机制特别适用于以下场景:
- 客服系统:记住用户的历史问题和服务记录
- 个人助手:记住用户的偏好和习惯
- 教育应用:跟踪学习进度和历史问答
- 商业分析:保存分析过程和结果供后续参考
总的来说,持久化记忆机制是构建专业级AI应用的重要组成部分,它让AI从"一次性对话工具"升级为"有记忆的智能伙伴"。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |