Pregel的Checkpointer采用基于Checkpoint的持久化记录下一个以Thread ID标识的对话历史,这种基于会话的持久化属于短期存储。复杂的流程在运行的时候还需要跨越多个对话的长期甚至永久存储,这类存储被抽象成一个BaseStore类型,采用内存存储的InMemoryStore它的常用实现。
1. BaseStore
Pregel的长期存储不再仅仅是存储简单的键值对,它具有一个支持层次化命名空间,还支持基于向量检索的自然语言查询,以及生命周期管理(TTL)。- class BaseStore(ABC):
- supports_ttl: bool = False
- ttl_config: TTLConfig | None = None
- …
- class TTLConfig(TypedDict, total=False):
- refresh_on_read: bool
- default_ttl: float | None
- sweep_interval_minutes: int | None
复制代码 我们先来是说基于生命周期的TTL如何使用,存储是否提供针对TTL的支持可以利用其supports_ttl字段来判断,而ttl_config字段返回的TTLConfig提供了TTL的相关配置,它包括如下的配置项:
- refresh_on_read(bool):决定了 “读取” 动作是否会延长数据的寿命,类似于滑动窗口缓存机制。如果不显式配置,通常默认为True。当执行get或search方法时,系统会自动重置该数据条目的过期计时器。开启此选项适用于 “活跃用户会话” ,只要用户还在持续提问或交互,其长期记忆就一直保留。只有当用户长时间不使用时,数据才会被清理。否则数据寿命自创建起固定,无论读取多少次,到期即删。
- default_ttl(float | None):定义 数据项默认过期时间,单位为分钟。默认为None,意味着数据永久有效。当新的数据项被存入时,它会被赋予一个初始倒计时。为存储的数据设置一个合理的生命周期有助于防止数据库随着时间推移被无用的僵尸数据填满。
- sweep_interval_minutes(int | None):该配置决定了后端引擎清理过期数据的频率。默认为None,表示不执行定期扫描。系统会维护一个异步数据清除任务,以设定的频率扫描数据库,删除那些倒计时归零的条目。如果没有配置此项,某些后端可能只在读取时发现数据过期(惰性删除),而不会主动回收磁盘空间。对于InMemoryStore,这非常关键,定期清理能防止内存会持续膨胀。对于支持原生 TTL 的数据库(如 Redis),此配置可能被忽略,因为数据库引擎具有自己的清理策略。
数据项的存入利用如下所示的put/aput方法完成,数据以“键值对”+“命名空间”的形式被存储,其中值以字典的形式提供字段和值的映射关系,具有层级的命名空间以字符串元组的形式提供。ttl参数可以为存入的数据指定一个针对性的过期时间。
- class BaseStore(ABC):
- def put(
- self,
- namespace: tuple[str, ...],
- key: str,
- value: dict[str, Any],
- index: Literal[False] | list[str] | None = None,
- *,
- ttl: float | None | NotProvided = NOT_PROVIDED,
- ) -> None:
- async def aput(
- self,
- namespace: tuple[str, ...],
- key: str,
- value: dict[str, Any],
- index: Literal[False] | list[str] | None = None,
- *,
- ttl: float | None | NotProvided = NOT_PROVIDED,
- ) -> None
复制代码 put/aput方法的参数index决定该数据项是否进入向量索引。如果我们只是简单地存入数据而不指定index,这些数据就像锁在保险箱里,只能通过精确的命名空间和键取回。如果我们采用list[str]的形式之定义多个索引字段,系统会提取这些字段的内容利用LLM生成嵌入(Embedding)向量进行存储,那么后续就能针对它们进行语义搜索。
get/aget方法根据提供的命名空间+键 的组合将对应的数据项提取出来,refresh_ttl参数决定是否重置该数据项的过期时间。方法返回的是一个Item对象,我们可以从中提取数据项的键、值、命名空间以及创建和最近修改的时间戳。Item类型的dict方法将自身转换成字典,两个时间类型会转换成ISO格式的字符串,相当于提供序列化的能力(datetime对象能直接被序列化)。- class BaseStore(ABC):
- def get(
- self,
- namespace: tuple[str, ...],
- key: str,
- *,
- refresh_ttl: bool | None = None,
- ) -> Item | None
- async def aget(
- self,
- namespace: tuple[str, ...],
- key: str,
- *,
- refresh_ttl: bool | None = None,
- ) -> Item | None
- …
- class Item:
- __slots__ = ("value", "key", "namespace", "created_at", "updated_at")
- def dict(self) -> dict
- …
复制代码 BaseStore提供了用于查询的search/asearch方法,我们调用这两个方法是时必须指定命名空间前缀。我们可以利用query参数实施基于自然语言的查询,它会触发底层存储的向量相似度搜索。filter参数以字典的形式提供基于多个字段的组合查询条件。limit和offset参数作为典型的分页参数,前者指定本次搜索返回结果的最大条目数,后者指定在返回结果集之前要跳过的条目数量。至于refresh_ttl参数,同样是用于决定是否重置过期时间。- class BaseStore(ABC):
- def search(
- self,
- namespace_prefix: tuple[str, ...],
- /,
- *,
- query: str | None = None,
- filter: dict[str, Any] | None = None,
- limit: int = 10,
- offset: int = 0,
- refresh_ttl: bool | None = None,
- ) -> list[SearchItem]:
- async def asearch(
- self,
- namespace_prefix: tuple[str, ...],
- /,
- *,
- query: str | None = None,
- filter: dict[str, Any] | None = None,
- limit: int = 10,
- offset: int = 0,
- refresh_ttl: bool | None = None,
- ) -> list[SearchItem]
- …
- class SearchItem(Item):
- __slots__ = ("score",)
- def dict(self) -> dict:
- result = super().dict()
- result["score"] = self.score
- return result
复制代码 search/asearch方法返回的不是迭代器,而是一个固化的SearchItem列表。SearchItem继承自Item,字段score是该类型存在的唯一理由,它代表搜索结果与查询之间的相关程度。具体数值取决于底层的向量存储 ,通常会采用余弦相似度或欧氏距离。它的dict方法会在基类返回的字典上将score代表的相似度得分添加进去。
除了上述的这些常规方法,BaseStore还提供用于删除指定数据项的delete方法,和get/aget方法一样,我们调用此方法时需要指定准确的命名空间和键。list_namespaces/alist_namespaces方法会根据指定的前缀/后缀查询命名空间的。- class BaseStore(ABC):
- def delete(self, namespace: tuple[str, ...], key: str) -> None
- def list_namespaces(
- self,
- *,
- prefix: NamespacePath | None = None,
- suffix: NamespacePath | None = None,
- max_depth: int | None = None,
- limit: int = 100,
- offset: int = 0,
- ) -> list[tuple[str, ...]]
- async def alist_namespaces(
- self,
- *,
- prefix: NamespacePath | None = None,
- suffix: NamespacePath | None = None,
- max_depth: int | None = None,
- limit: int = 100,
- offset: int = 0,
- ) -> list[tuple[str, ...]]
- …
- NamespacePath = tuple[str | Literal["*"], ...]
复制代码 BaseStore的实现类型必须实现它的batch/abatch方法。这两个抽象方法也是BaseStore最核心的方法,因为其他的快捷方法(如get、put、search)在底层通常都是封装成一个单操作的batch来执行的。在分布式图计算中, 工作 Node可能位于不同的服务器,合并多个操作对于较少时延尤为重要。- class BaseStore(ABC):
- @abstractmethod
- def batch(self, ops: Iterable[Op]) -> list[Result]
- @abstractmethod
- async def abatch(self, ops: Iterable[Op]) -> list[Result]
- ...
- Op = GetOp | SearchOp | PutOp | ListNamespacesOp
- Result = Item | list[Item] | list[SearchItem] | list[tuple[str, ...]] | None
复制代码 被batch/abatch方法打包的四个操作(get、search、put和list_namespaces)分别定义成如下的类型,各自的字段基本上对应这个快捷方法的参数。batch/abatch返回的是元素类型为Result的列表。- class GetOp(NamedTuple):
- namespace: tuple[str, ...]
- key: str
- refresh_ttl: bool = True
- class SearchOp(NamedTuple):
- namespace_prefix: tuple[str, ...]
- filter: dict[str, Any] | None = None
- limit: int = 10
- offset: int = 0
- query: str | None = None
- refresh_ttl: bool = True
- class MatchCondition(NamedTuple):
- match_type: NamespaceMatchType
- path: NamespacePath
- class ListNamespacesOp(NamedTuple):
- match_conditions: tuple[MatchCondition, ...] | None = None
- max_depth: int | None = None
- limit: int = 100
- offset: int = 0
- class PutOp(NamedTuple):
- namespace: tuple[str, ...]
- key: str
- value: dict[str, Any] | None
- index: Literal[False] | list[str] | None =
- ttl: float | None = None
复制代码 2.InMemoryStore
InMemoryStore在内部利用一个字典来存储数据项(即众多具有命名空间的键值对),并针对这个结构提供单记录的读取和多记录的查询。由于需要提供自然语言查询功能,InMemoryStore需要利用语言模型将文本转换成多维嵌入向量的能力。
我们知道嵌入(Embedding)是语言模型的一项核心概念,它来源于拓扑学和几何学,它表示的是将一个低维的对象,“嵌入”到一个高维的连续空间中并保持其结构不变。基本的原理是:定义一个N维的语义空间,将文本根据其表达的语义转换成该空间的一个点,并通过N维的向量来表示。我们一般将这个语义空间称为嵌入空间,对应点的向量称为嵌入向量。嵌入的原则很明确,那就是让具有相似语义的文本对应的点在嵌入空间尽量靠近,否则使它们尽量远离。
如果我们将数据库记录相关的文本字段转换成对应的嵌入向量,那么实现基于自然语言的查询就很容易实现:将查询文本采用相同的嵌入规则转换成向量,以两个点在嵌入空间的距离(比如欧氏距离)或者向量之间的夹角(余弦相似度)作为相似语义的度量对数据项实施查询。我们将这种查询方式称为向量检索。InMemoryStore采用的是基于余弦相似度的检索方式。
初始化InMemoryStore对象的时候,需要提供一个IndexConfig对象来对采用的向量索引进行配置。IndexConfig的dims字段表示嵌入空间的温度,而fields字段提供参与嵌入转换的字段。嵌入功能可以由一个Embeddings对象提供,它利用两组方法embed_documents/aembed_documents和embed_query/aembed_query提供了针对待检索的文档(针对数据项)和查询文本实施嵌入。EmbeddingsFunc和AEmbeddingsFunc以可执行对象的形式提供了相同的功能表达。这三种形式对embed字段的设置都是支持的。- class InMemoryStore(BaseStore):
- def __init__(self, *, index: IndexConfig | None = None) -> None:
- class IndexConfig(TypedDict, total=False):
- dims: int
- embed: Embeddings | EmbeddingsFunc | AEmbeddingsFunc | str
- fields: list[str] | None
- class Embeddings(ABC):
- @abstractmethod
- def embed_documents(self, texts: list[str]) -> list[list[float]]:
- @abstractmethod
- def embed_query(self, text: str) -> list[float]:
- async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
- async def aembed_query(self, text: str) -> list[float]:
- EmbeddingsFunc = Callable[[Sequence[str]], list[list[float]]]
- AEmbeddingsFunc = Callable[[Sequence[str]], Awaitable[list[list[float]]]]
复制代码 对于2.0之前的版本还支持将其设置为一个预定义的Embeddings名称,内部会调用langchain.embeddings.init_embeddings函数提取内部的映射得到对应的Embeddings对象,但是这个langchain.embeddings包已经与最新版本的LangChain不兼容了。指定的Embeddings应该与嵌入空间维度相匹配,而嵌入是语言模型提供的能力,我们可以根据选择的语言模型和版本指定对应的维度。比如我们可以通过如下的方式得到 “text-embedding-3-small” 这个模型采用的嵌入空间维度是1536。- from langchain_openai import OpenAIEmbeddings
- embeddings = OpenAIEmbeddings(
- model="text-embedding-3-small",
- api_key= ,
- base_url=<base_url>,
- )
- vector = embeddings.embed_query("Hello world")
- print(1536==len(vector))
复制代码 在如下的演示程序中,我们针对模型text-embedding-3-small创建了一个OpenAIEmbeddings,并据此创建了一个InMemoryStore对象。然后我们以命名空间profile.instrests存入了两个分别针对饮食和运动喜好的数据项。- from langchain_openai import OpenAIEmbeddings
- from langgraph.store.base import IndexConfig
- from langgraph.store.memory import InMemoryStore
- import json
- base_url = "base URL of you deployed model"
- model_name = "text-embedding-3-small"
- api_key ="your api key"
- embeddings = OpenAIEmbeddings(
- model="text-embedding-3-small",
- api_key=api_key,
- base_url=base_url,
- )
- store = InMemoryStore(
- index=IndexConfig(
- embed=embeddings,
- dims=1536
- )
- )
- namespace = ("profile", "interests")
- store.put(namespace, "foods", {"text": "I love eating spicy Sichuan food."})
- store.put(namespace, "sports", {"text": "My favorite sport is swimming."})
- results = store.search(namespace, query="What are my food preferences?", limit=1)
- for item in results:
- print(json.dumps(item.dict(), indent=2))
复制代码 我们指定命名空间调用InMemoryStore对象的search方法以自然语言的形式(“What are my food preferences?”)实施查询。由于我们只有两条数据,所以我们将limit参数设置为1返回匹配度最高的那一条,我们会得到希望的第一条数据。如下所示的是查询结果。- {
- "namespace": [
- "profile",
- "interests"
- ],
- "key": "foods",
- "value": {
- "text": "I love eating spicy Sichuan food."
- },
- "created_at": "2026-01-28T02:53:58.641680+00:00",
- "updated_at": "2026-01-28T02:53:58.641682+00:00",
- "score": 0.3216021020504674
- }
复制代码 3. 在Pregel中的使用
如果需要在Pregel中使用长期存储,需要在初始化Pregel对象的时候提供对应的BaseStore对象。由于Node处理函数不支持针对BaseStore的直接注入,我们还得像使用静态上下文和StreamWriter一样,从注入的RunnableConfig配置中得到当前的Runtime对象,它的store字段返回的就是我们指定的BaseStore对象。
我们使用上面创建用于存储用于个人喜好的InMemoryStore编写了如下这个演示程序,这次我们将用户ID放进命名空间({user_id}.{interests})。我们使用静态上下文类型User来传递用户ID,所以在初始化Pregel对象的时候,除了需要指定store之外,还需要将User类型设置为context_schema。- from langchain_openai import OpenAIEmbeddings
- from langgraph.store.base import BaseStore, IndexConfig
- from langgraph.store.memory import InMemoryStore
- from langgraph.types import RunnableConfig
- from langgraph.runtime import Runtime
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.channels import LastValue
- class User:
- user_id: str
- def __init__(self, user_id:str):
- self.user_id = user_id
- def handle(query: str, config:RunnableConfig) -> str:
- runtime: Runtime = config["configurable"].get("__pregel_runtime")
- store:BaseStore = runtime.store
- user_id = runtime.context.user_id
- namespace = (user_id, "interests")
- results = store.search(namespace, query=query, limit=1)
- if len(results) > 0:
- return results[0].value["text"]
- return "No interests found."
- node =(NodeBuilder()
- .subscribe_only("query")
- .do(handle)
- .write_to("answer"))
- def build_store() -> BaseStore:
- embeddings = OpenAIEmbeddings(
- model="text-embedding-3-small",
- api_key="your api key",
- base_url=" base URL of you deployed model"
- )
- store = InMemoryStore(
- index=IndexConfig(
- embed=embeddings,
- dims=1536
- )
- )
- namespace = ("jyden", "interests")
- store.put(namespace, "foods", {"text": "I love eating spicy Sichuan food."})
- store.put(namespace, "sports", {"text": "My favorite sport is swimming."})
- return store
- app = Pregel(
- nodes={"body": node},
- channels={"query": LastValue(str), "answer": LastValue(str)},
- input_channels=["query"],
- output_channels=["answer"],
- store= build_store(),
- context_schema=User)
- result = app.invoke(
- input={"query": "What is my favorite sport?"},
- context=User(user_id="jyden"))
- assert result == {"answer": "My favorite sport is swimming."}
- result = app.invoke(
- input={"query": "What are my food preferences?"},
- context=User(user_id="jyden"))
- assert result == {"answer": "I love eating spicy Sichuan food."}
复制代码 调用Pregel对象时利用指定的静态上下文指定用户ID,并将针对用户喜好的查询文本写入输入Channel“query”,并触发唯一的用于响应查询的Node。在Node的处理函数中,我们直接将查询文本中作为参数,并从另一个RunnableConfig类型的参数中提取静态上下文和BaseStore对象。在从静态上下文得到用户ID后,我们用它生成对应的命名空间针对得到BaseStore对象实施查询,并返回最终的查询结果。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |