Pregel中的Node对应的类型为PregelNode。对于一个PregelNode对象来说,它最核心的部分就是绑定在它上面的一个可执行操作,它是抽象类Runnable的子类。在LangChain整个体系中,Runnable类型几乎无处不在,包括语言模型(不论是传统的LLM模型还是Chat模型)、实现RAG的Retriever和提示词模板等组件都是一个Runnable对象,实际上Pregel本身也是一个Runnable对象。LangChain的“Chain”就是由一组Runnable对象按照指定的顺序构建而成。Runnable如此重要,值得单开一个系列进行独立介绍,这里我们可用先将它理解成一个可执行对象,可用帮助我们执行定义Node的函数。- class PregelNode:
- bound: Runnable[Any, Any]
复制代码 1. 输入
PregelNode的channels和triggers字段表示作为输入和触发器的Channel的名称。由于ManagedValue也可用作为输入,所以channels字段也可用包含ManagerValue的名称,这里我们统称为Channel。同一个Channel可以同时作为输入和触发器,出现在这两个字段中。- class PregelNode:
- channels : str | list[str]
- triggers : list[str]
复制代码 对于单一的输入,Channel名称可以设置为字符串,也可以封装成列表,它们会影响Node处理函数的输入参数传递方式。对于前者(字符串),对应Channel的值会作为原始参数传递给Node的处理函数,后者则会封装成字典进行传递,字典的Key为Channel的名称。如下的演示实例体现了这一点。- from langgraph.channels import LastValue
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.pregel._read import PregelNode
- def build_node(node_name:str)->PregelNode:
- channel_in = f"{node_name}_in"
- channel_out = f"{node_name}_out"
- node = (NodeBuilder()
- .do(lambda args:args)
- .write_to(channel_out)).build()
-
- node.triggers = [channel_in]
- node.channels = channel_in if node_name == "foo" else [channel_in]
- return node
- app = Pregel(
- nodes= {name: build_node(name) for name in ["foo","bar"]},
- channels= {
- "foo_in": LastValue(str),
- "bar_in": LastValue(str),
- "foo_out": LastValue(object),
- "bar_out": LastValue(object),
- },
- input_channels=["foo_in", "bar_in"],
- output_channels=["foo_out", "bar_out"])
- result = app.invoke(input={"foo_in":"foobar", "bar_in":"foobar"})
- assert result["foo_out"] == "foobar"
- assert result["bar_out"] == {'bar_in': 'foobar'}
复制代码 如果一个Channel被多个Node作为输入,只要该Channel被任一Node以列表的形式注册,引擎内部的对齐机制将统一使用字典作为所有Node处理函数的输入。比如我们按照如下的方式修改了上面的程序,是foo和bar两个Node都使用foobarChannel作为输入,该输入Channel在bar中以列表的形式进行了设置,以字符串形式设置的fooNode的处理函数的输入参数依然会编程字典。这是一个不为人知的细节。- from langgraph.channels import LastValue
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.pregel._read import PregelNode
- def build_node(node_name:str)->PregelNode:
- node = (NodeBuilder()
- .do(lambda args:args)
- .write_to(node_name)).build()
- node.triggers = ["input"]
- node.channels = "input" if node_name == "foo" else ["input"]
- return node
- app = Pregel(
- nodes= {name: build_node(name) for name in ["foo","bar"]},
- channels= {
- "input": LastValue(str),
- "foo": LastValue(object),
- "bar": LastValue(object),
- },
- input_channels=["input"],
- output_channels=["foo", "bar"])
- result = app.invoke(input={"input":"foobar"})
- assert result["foo"] == {'input': 'foobar'}
- assert result["bar"] == {'input': 'foobar'}
复制代码 2. 输出
Node的执行结果依赖于PregelNode的writers字段返回的一组Runnable对象输出到对应的Channel,系统默认使用的是一个ChannelWrite。如代码片段所示,初始化ChannelWrite对象时需要提供一组ChannelWriteEntry或ChannelWriteTupleEntry来表示针对目标Channel的写入意图。另一个应用了@cached_property装饰器的flat_writers返回一组扁平化的Runnable对象以提供性能。- class PregelNode:
- writers : list[Runnable]
- @cached_property
- def flat_writers(self) -> list[Runnable]
复制代码- class ChannelWrite(RunnableCallable):
- writes: list[ChannelWriteEntry | ChannelWriteTupleEntry | Send]
- def __init__(
- self,
- writes: Sequence[ChannelWriteEntry | ChannelWriteTupleEntry | Send],
- *,
- tags: Sequence[str] | None = None,
- )
复制代码 ChannelWriteEntry的channel和value字段分别表示输出Channel名称和值。skip_none字段决定是否需要忽略None值,如果指定了mapper字段,value还会被它作进一步处理并最终生成输出到Channel的值。一个ChannelWriteEntry对应一个单一Channel的输出,而ChannelWriteTupleEntry可以完成针对多Channel的输出。具体来说,Node处理函数返回的对象可以绑定在它的value字段上,通过mapper提供的可执行对象映射为一个“二元组序列”,此二元组的两部分对应输出Channel的名称和值。ChannelWriteTupleEntry的static设置的三元组序列仅作静态分析用,可以忽略。- class ChannelWriteEntry(NamedTuple):
- channel: str
- value: Any = PASSTHROUGH
- skip_none: bool = False
- mapper: Callable | None = None
- class ChannelWriteTupleEntry(NamedTuple):
- mapper: Callable[[Any], Sequence[tuple[str, Any]] | None]
- value: Any = PASSTHROUGH
- static: Sequence[tuple[str, Any, str | None]] | None = None
- PASSTHROUGH = object()
复制代码 在如下所示的演示实例中,我们创建了一个包含唯一Node的Pregel对象,并创建了一个包含单一ChannelWrite对象的列表作为该Node的writers字段。此ChannelWrite的writes列表包含两个ChannelWriteEntry和一个ChannelWriteTupleEntry,它们分别完成了针对三个Channel的输出。- from langgraph.channels import LastValue
- from langgraph.pregel import Pregel
- from langgraph.pregel._read import PregelNode
- from langgraph.pregel._write import ChannelWrite, ChannelWriteEntry, ChannelWriteTupleEntry
- entry1 = ChannelWriteEntry(
- channel="foo",
- value= "123"
- )
- entry2 = ChannelWriteEntry(
- channel="bar",
- value= "456",
- mapper= lambda v: int(v)
- )
- tuple_entry = ChannelWriteTupleEntry(
- value= {"foo":"123", "bar":"456"},
- mapper= lambda v: [("baz", int(v["foo"]) + int(v["bar"]))]
- )
- node = PregelNode(
- triggers=["start"],
- channels=[],
- writers=[ChannelWrite(writes=[entry1, entry2, tuple_entry])]
- )
- app = Pregel(
- nodes={"body": node},
- channels={
- "start": LastValue(None),
- "foo": LastValue(str),
- "bar": LastValue(int),
- "baz": LastValue(int)
- },
- input_channels=["start"],
- output_channels=["foo", "bar", "baz"],
- )
- result = app.invoke(input={"start": None})
- assert result == {"foo": "123", "bar": 456, "baz": 579}
复制代码 对于通过PregelNode对象表示的Node来说,其bound字段返回的Runnable对象用于执行处理函数,操作执行的结果由writers列表的一组Runnable对象写入相应的Channel,这两个核心工作最终会被如下这个node属性合并。对于Pregel来说,该属性在功能上基本就代表了整个Node,这也是该属性如此命名的原因。- class PregelNode:
- @cached_property
- def node(self) -> Runnable[Any, Any] | None
复制代码 3. 输入映射
Node基于Channel的输入、触发和输出分别对应channels、triggers和writers字段。如果所有输入Channel读取的原始输入(以字典形式表示)和提交给处理函数的参数有出入,我们还可以利用mapper字段返回的可执行对象(Callable[[Any], Any])作进一步映射。- class PregelNode:
- mapper : Callable[[Any], Any] | None
复制代码 如下的演示程序通过Pregel的mapper字段设置为了一个Lambda表达式,将提供给Node处理函数处理的字典转换成元组。- from langgraph.channels import LastValue
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.pregel._read import PregelNode
- from typing import Tuple
- node: PregelNode = (NodeBuilder()
- .subscribe_to("foo","bar")
- .do(lambda args:args)
- .write_to("output")).build()
- node.mapper = lambda args:tuple(args.values())
- app = Pregel(
- nodes={"body": node},
- channels={
- "foo": LastValue(str),
- "bar": LastValue(str),
- "output": LastValue(Tuple[str,str]),
- },
- input_channels=["foo","bar"],
- output_channels=["output"],
- )
- result = app.invoke(input={"foo":"hello", "bar":"world"})
- assert result["output"] == ("hello","world")
复制代码 4. 失败重试
Agent中的Node可能会涉及网络传输、数据检索等会导致瞬时错误的操作,失败后自动重试机制是确保可靠性的主要手段,我们可以利用PregelNode 的retry_policy字段设置相应的重试策略。具体的重试策略通过如下所示的RetryPolicy具名元组表示。RetryPolicy的max_attempts和initial_interval分别表示最大重试次数(包含初次调用)和第一次重试前的初始等待时间,单位为秒。如果没有为Node设置针对性的重试策略,Pregel的retry_policy字段设置的重试策略将作为兜底。- class PregelNode:
- retry_policy : Sequence[RetryPolicy] | None
- class RetryPolicy(NamedTuple):
- initial_interval: float = 0.5
- backoff_factor: float = 2.0
- max_interval: float = 128.0
- max_attempts: int = 3
- jitter: bool = True
- retry_on: (
- type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]
- ) = default_retry_on
- class Pregel(
- PregelProtocol[StateT, ContextT, InputT, OutputT],
- Generic[StateT, ContextT, InputT, OutputT]):
- retry_policy : Sequence[RetryPolicy] = ()
复制代码 重试策略采用基于“间隔倍增”的退避机制(Back off),也就是下次重试等待时间是前一次等待的N倍,这个倍数通过backoff_factor字段来提供,max_interval字段为等待实现设置了上限。为了防止多个并发Node同时重试而产生“惊群效应”,我们可以在重试间隔中添加随机抖动,jitter是这一特性的开关。调用失败有很多原因,重试在任何错误场景中都有意义,我们可以利用retry_on字段设置为重置设置前置条件。该字段的默认值对应如下这个default_retry_on函数。- def default_retry_on(exc: Exception) -> bool:
- import httpx
- import requests
- if isinstance(exc, ConnectionError):
- return True
- if isinstance(exc, httpx.HTTPStatusError):
- return 500 <= exc.response.status_code < 600
- if isinstance(exc, requests.HTTPError):
- return 500 <= exc.response.status_code < 600 if exc.response else True
- if isinstance(
- exc,
- (
- ValueError,
- TypeError,
- ArithmeticError,
- ImportError,
- LookupError,
- NameError,
- SyntaxError,
- RuntimeError,
- ReferenceError,
- StopIteration,
- StopAsyncIteration,
- OSError,
- ),
- ):
- return False
- return True
复制代码 5. 结果缓存
如果Node绑定一个相对耗时的计算,并且结果完全由给定的输入决定,那么针对输入对结果予以缓存无疑是改善时延的好办法。基于结果的缓存可以通过PregelNode 的cache_policy字段返回的缓存策略来控制。缓存策略通过CachePolicy类型表示,它具有key_func和ttl两个字段,前者提供一个用于解析缓存键(字符串或者字节数组)的可执行对象,后者用于设置缓存过期时间(如果没有显示设置,意味着永不过期)。- from langgraph.channels import LastValue
- from langgraph.types import RetryPolicy
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.pregel._read import PregelNode
- from typing import Any
- def get_handler():
- times = 0
- def handle(args: dict[str, Any]) -> str:
- nonlocal times
- times += 1
- if times < 3:
- raise Exception("manually thrown error.")
- return "Success"
- return handle
- def build_node(max_attempts: int) -> PregelNode:
- node = (
- NodeBuilder().subscribe_to("start").do(get_handler()).write_to("output")
- ).build()
- node.retry_policy = [RetryPolicy(max_attempts=max_attempts)]
- return node
- def build_pregel(max_attempts: int) -> Pregel:
- return Pregel(
- nodes={"body": build_node(max_attempts)},
- channels={
- "start": LastValue(None),
- "output": LastValue(str),
- },
- input_channels=["start"],
- output_channels=["output"],
- )
- app = build_pregel(max_attempts=2)
- try:
- app.invoke(input={"start": None})
- assert False, "Expected an exception but none was raised."
- except Exception as e:
- assert str(e) == "manually thrown error."
- app = build_pregel(max_attempts=3)
- result = app.invoke(input={"start": None})
- assert result["output"] == "Success"
复制代码 对结果实施缓存的前提是需要将输入的“指纹”作为缓存键,这里的缓存键根据通过INPUT_CACHE_KEY_TYPE定义的二元组进行计算,该二元组前半部分提供的可执行对象相当于一个哈希函数,能够将原始内容转换成“指纹”;后者提供的多元组以路径的方式唯一标识当前的节点。CachePolicy的key_func字段对应的可执行对象将会作为INPUT_CACHE_KEY_TYPE二元组的前半部分,从定义可以看出默认设置的default_cache_key函数会利用pickle以序列化的方式将原始输入转换成字节作为指纹。该指纹和二元组后半部分合并称为Node执行结果缓存项的Key。
这里之所以需要强制使用字符串或者字节来表示缓存键,是因为Pregel针对缓存的实现并不限于内存存储,原则上可以与任意的内存数据库进行整合(比如redis)。缓存存储在Pregel中通过如下这个抽象基类BaseCache表示,它定义了一系列的抽象方法完成针对缓存的读取、写入和清除,我们可以通过派生此基类实现自定义的缓存存储。开发测试阶段我们经常会使用基于内存存储的InMemoryCache。如果没有对Node的缓存策略作针对性设置,在Pregel对象上设置的缓存策略将作为兜底。- class PregelNode:
- cache_policy : CachePolicy | None
- @cached_property
- def input_cache_key(self) -> INPUT_CACHE_KEY_TYPE
- @dataclass(**_DC_KWARGS)
- class CachePolicy(Generic[KeyFuncT]):
- key_func: KeyFuncT = default_cache_key
- ttl: int | None = None
- KeyFuncT = TypeVar("KeyFuncT", bound=Callable[..., str | bytes])
- INPUT_CACHE_KEY_TYPE = tuple[Callable[..., Any], tuple[str, ...]]
- def default_cache_key(*args: Any, **kwargs: Any) -> str | bytes:
- return pickle.dumps((_freeze(args), _freeze(kwargs)), protocol=5, fix_imports=False)
复制代码 在如下所示的演示程序中,对于创建的Pregel的唯一Node,它虽然具有两个输入Channel(foo和bar),但是对应的处理函数会将当前时间戳作为返回结果。我们为该Node设置了缓存策略,并将过期时间设置为30秒。- class BaseCache(ABC, Generic[ValueT]):
- serde: SerializerProtocol = JsonPlusSerializer(pickle_fallback=True)
- def __init__(self, *, serde: SerializerProtocol | None = None) -> None:
- self.serde = serde or self.serde
- @abstractmethod
- def get(self, keys: Sequence[FullKey]) -> dict[FullKey, ValueT]:
- @abstractmethod
- async def aget(self, keys: Sequence[FullKey]) -> dict[FullKey, ValueT]:
- @abstractmethod
- def set(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) -> None:
- @abstractmethod
- async def aset(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) -> None:
- @abstractmethod
- def clear(self, namespaces: Sequence[Namespace] | None = None) -> None:
- @abstractmethod
- async def aclear(self, namespaces: Sequence[Namespace] | None = None) -> None:
- class Pregel(
- PregelProtocol[StateT, ContextT, InputT, OutputT],
- Generic[StateT, ContextT, InputT, OutputT]):
- cache: BaseCache | None = None
- cache_policy: CachePolicy | None = None
复制代码 我们以5秒为间隔调用了Pregel对象三次,前两次使用相同的输入({"foo":"abc", "bar":"xyz"})。三次调用的时间戳和输入输出会以如下的形式打印出来,我们可以清晰地看到前两次由于提供了相同的参数,所以得到了相同的结果,很明显第二次得到的是缓存的结果。- from langgraph.channels import LastValue
- from langgraph.types import CachePolicy
- from langgraph.pregel import Pregel,NodeBuilder
- import datetime,time
- from langgraph.cache.memory import InMemoryCache
- node = (NodeBuilder()
- .subscribe_to("foo","bar")
- .do(lambda _: datetime.datetime.now())
- .write_to("output")).build()
- node.cache_policy = CachePolicy(ttl=30)
- app = Pregel(
- nodes={"body": node},
- cache=InMemoryCache(),
- channels={
- "foo": LastValue(str),
- "bar": LastValue(str),
- "output": LastValue(str),
- },
- input_channels=["foo","bar"],
- output_channels=["output"])
- input = {"foo":"abc", "bar":"xyz"}
- result = app.invoke(input=input)
- print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
- time.sleep(5)
- result = app.invoke(input=input)
- print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
- time.sleep(5)
- input = {"foo":"xyz", "bar":"abc"}
- result = app.invoke(input=input)
- print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
复制代码 6.补遗
前面已经介绍了PregelNode类型的大部分核心成员,对于如下几个遗漏的成员,我们在这里作一下概况性介绍。tags使我们可以在Node上打上相应的标签,而metadata则可以在它上面附加任意的元数据。- [2026-01-31 23:51:20.178285]{'foo': 'abc', 'bar': 'xyz'} -> 2026-01-31 23:51:20.177192
- [2026-01-31 23:51:25.180527]{'foo': 'abc', 'bar': 'xyz'} -> 2026-01-31 23:51:20.177192
- [2026-01-31 23:51:30.183805]{'foo': 'xyz', 'bar': 'abc'} -> 2026-01-31 23:51:30.182490
复制代码 Node结合边构成了图,而图本身也可以作为一个Node参与构建一个更大的图,所以图具有一个嵌套的层级结构,一个Node可以包含一组子图,体现在subgraphs字段上。方法copy对返回自身的一个浅拷贝,至于四个方法(invoke、ainvoke、stream和astream)实现的两种调用模式,最终还是通过调用bound字段的Runnable对象的同名方法实现的。
我们最后使用最简单的语言对Pregel做一个总结:我们可以将 PregelNode 想象成一个智能反应堆,其中triggers是点火装置(决定什么时候开始),channels是原料管道(输入数据),mapper 是入料加工(数据预处理),bound 是核心反应室(业务逻辑),writers 是成品输送带(更新状态)。
7. NodeBuilder
为了让大家对表示Node的PregelNode类型有深入地理解,在前面的演示中我们大都采用直接对其字段进行设置的方式,实际上在真正的开发中基本不会这么做,而是选择使用NodeBuilder来构建它,后者提供更加精简的API。- class PregelNode:
- tags : Sequence[str] | None
- metadata : Mapping[str, Any] | None
- subgraphs : Sequence[PregelProtocol]
- def copy(self, update: dict[str, Any]) -> PregelNode
- def invoke(
- self,
- input: Any,
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
- ) -> Any
- async def ainvoke(
- self,
- input: Any,
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
- ) -> Any
- def stream(
- self,
- input: Any,
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
- ) -> Iterator[Any]
- async def astream(
- self,
- input: Any,
- config: RunnableConfig | None = None,
- **kwargs: Any | None,
- ) -> AsyncIterator[Any]
复制代码 如果构建的Node只需定义一个Channel,我们可以调用subscribe_only方法,它将以字符串的(不是列表)形式赋值给channels字段。subscribe_to方法默认会将指定的Channel同时添加到triggers和channels列表中,如果将read参数设置为False,指定的Channel只会作为输入添加到channels列表中。read_from方法指定的仅仅是输入Channel,所以只会添加到channels列表中。
如果自行构建PregelNode,针对Channel的输出会很麻烦,使用NodeBuilder的write_to方法就简单多了,我们只需要指定输出Channel的名称列表就可以了。如果需要多输出作更细粒度的控制,也可以指定一组ChannelWriteEntry对象。我们也可以利用write_to方法提供的关键字参数针对Channel的写入(此时参数名会作为输出Channel的名称),其类型_WriteValue定义如下,所以我们可以使用兼容的Lambda表达式简单快捷地完成输出。- class NodeBuilder:
- def subscribe_only(
- self,
- channel: str,
- ) -> Self
- def subscribe_to(
- self,
- *channels: str,
- read: bool = True,
- ) -> Self
- def read_from(
- self,
- *channels: str,
- ) -> Self
- def write_to(
- self,
- *channels: str | ChannelWriteEntry,
- **kwargs: _WriteValue,
- ) -> Self
- def meta(self, *tags: str, **metadata: Any) -> Self
- def add_retry_policies(self, *policies: RetryPolicy) -> Self
- def add_cache_policy(self, policy: CachePolicy) -> Self
- def build(self) -> PregelNode
复制代码 前面介绍的打标签和附加元数据的功能可以调用NodeBuilder的meta方法来完成,失败重试和结果缓存策略则由add_retry_policies和add_cache_policy方法来提供。等所有设置完成之后,我们直接调用build方法将目标Node构建出来。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |