找回密码
 立即注册
首页 业界区 业界 [拆解LangChain执行引擎] PregelNode——无状态的功能节 ...

[拆解LangChain执行引擎] PregelNode——无状态的功能节点

铜坠匍 5 天前
Pregel中的Node对应的类型为PregelNode。对于一个PregelNode对象来说,它最核心的部分就是绑定在它上面的一个可执行操作,它是抽象类Runnable的子类。在LangChain整个体系中,Runnable类型几乎无处不在,包括语言模型(不论是传统的LLM模型还是Chat模型)、实现RAG的Retriever和提示词模板等组件都是一个Runnable对象,实际上Pregel本身也是一个Runnable对象。LangChain的“Chain”就是由一组Runnable对象按照指定的顺序构建而成。Runnable如此重要,值得单开一个系列进行独立介绍,这里我们可用先将它理解成一个可执行对象,可用帮助我们执行定义Node的函数。
  1. class PregelNode:
  2.     bound: Runnable[Any, Any]
复制代码
1. 输入

PregelNode的channels和triggers字段表示作为输入和触发器的Channel的名称。由于ManagedValue也可用作为输入,所以channels字段也可用包含ManagerValue的名称,这里我们统称为Channel。同一个Channel可以同时作为输入和触发器,出现在这两个字段中。
  1. class PregelNode:
  2.     channels : str | list[str]
  3.     triggers : list[str]
复制代码
对于单一的输入,Channel名称可以设置为字符串,也可以封装成列表,它们会影响Node处理函数的输入参数传递方式。对于前者(字符串),对应Channel的值会作为原始参数传递给Node的处理函数,后者则会封装成字典进行传递,字典的Key为Channel的名称。如下的演示实例体现了这一点。
  1. from langgraph.channels import LastValue
  2. from langgraph.pregel import Pregel, NodeBuilder
  3. from langgraph.pregel._read import PregelNode
  4. def build_node(node_name:str)->PregelNode:
  5.     channel_in = f"{node_name}_in"
  6.     channel_out = f"{node_name}_out"
  7.     node = (NodeBuilder()
  8.         .do(lambda args:args)
  9.         .write_to(channel_out)).build()
  10.    
  11.     node.triggers = [channel_in]
  12.     node.channels = channel_in if node_name == "foo" else [channel_in]
  13.     return node
  14. app = Pregel(
  15.     nodes= {name: build_node(name) for name in ["foo","bar"]},
  16.     channels= {
  17.         "foo_in": LastValue(str),
  18.         "bar_in": LastValue(str),
  19.         "foo_out": LastValue(object),
  20.         "bar_out": LastValue(object),
  21.     },
  22.     input_channels=["foo_in", "bar_in"],
  23.     output_channels=["foo_out", "bar_out"])
  24. result = app.invoke(input={"foo_in":"foobar", "bar_in":"foobar"})
  25. assert result["foo_out"] == "foobar"
  26. assert result["bar_out"] == {'bar_in': 'foobar'}
复制代码
如果一个Channel被多个Node作为输入,只要该Channel被任一Node以列表的形式注册,引擎内部的对齐机制将统一使用字典作为所有Node处理函数的输入。比如我们按照如下的方式修改了上面的程序,是foo和bar两个Node都使用foobarChannel作为输入,该输入Channel在bar中以列表的形式进行了设置,以字符串形式设置的fooNode的处理函数的输入参数依然会编程字典。这是一个不为人知的细节。
  1. from langgraph.channels import LastValue
  2. from langgraph.pregel import Pregel, NodeBuilder
  3. from langgraph.pregel._read import PregelNode
  4. def build_node(node_name:str)->PregelNode:
  5.     node = (NodeBuilder()
  6.         .do(lambda args:args)
  7.         .write_to(node_name)).build()   
  8.     node.triggers = ["input"]
  9.     node.channels = "input" if node_name == "foo" else ["input"]
  10.     return node
  11. app = Pregel(
  12.     nodes= {name: build_node(name) for name in ["foo","bar"]},
  13.     channels= {
  14.         "input": LastValue(str),
  15.         "foo": LastValue(object),
  16.         "bar": LastValue(object),
  17.     },
  18.     input_channels=["input"],
  19.     output_channels=["foo", "bar"])
  20. result = app.invoke(input={"input":"foobar"})
  21. assert result["foo"] == {'input': 'foobar'}
  22. assert result["bar"] == {'input': 'foobar'}
复制代码
2. 输出

Node的执行结果依赖于PregelNode的writers字段返回的一组Runnable对象输出到对应的Channel,系统默认使用的是一个ChannelWrite。如代码片段所示,初始化ChannelWrite对象时需要提供一组ChannelWriteEntry或ChannelWriteTupleEntry来表示针对目标Channel的写入意图。另一个应用了@cached_property装饰器的flat_writers返回一组扁平化的Runnable对象以提供性能。
  1. class PregelNode:
  2.     writers : list[Runnable]
  3.     @cached_property
  4.     def flat_writers(self) -> list[Runnable]
复制代码
  1. class ChannelWrite(RunnableCallable):
  2.     writes: list[ChannelWriteEntry | ChannelWriteTupleEntry | Send]
  3.     def __init__(
  4.         self,
  5.         writes: Sequence[ChannelWriteEntry | ChannelWriteTupleEntry | Send],
  6.         *,
  7.         tags: Sequence[str] | None = None,
  8.     )
复制代码
ChannelWriteEntry的channel和value字段分别表示输出Channel名称和值。skip_none字段决定是否需要忽略None值,如果指定了mapper字段,value还会被它作进一步处理并最终生成输出到Channel的值。一个ChannelWriteEntry对应一个单一Channel的输出,而ChannelWriteTupleEntry可以完成针对多Channel的输出。具体来说,Node处理函数返回的对象可以绑定在它的value字段上,通过mapper提供的可执行对象映射为一个“二元组序列”,此二元组的两部分对应输出Channel的名称和值。ChannelWriteTupleEntry的static设置的三元组序列仅作静态分析用,可以忽略。
  1. class ChannelWriteEntry(NamedTuple):
  2.     channel: str
  3.     value: Any = PASSTHROUGH
  4.     skip_none: bool = False
  5.     mapper: Callable | None = None
  6. class ChannelWriteTupleEntry(NamedTuple):
  7.     mapper: Callable[[Any], Sequence[tuple[str, Any]] | None]
  8.     value: Any = PASSTHROUGH
  9.     static: Sequence[tuple[str, Any, str | None]] | None = None
  10. PASSTHROUGH = object()
复制代码
在如下所示的演示实例中,我们创建了一个包含唯一Node的Pregel对象,并创建了一个包含单一ChannelWrite对象的列表作为该Node的writers字段。此ChannelWrite的writes列表包含两个ChannelWriteEntry和一个ChannelWriteTupleEntry,它们分别完成了针对三个Channel的输出。
  1. from langgraph.channels import LastValue
  2. from langgraph.pregel import Pregel
  3. from langgraph.pregel._read import PregelNode
  4. from langgraph.pregel._write import ChannelWrite, ChannelWriteEntry, ChannelWriteTupleEntry
  5. entry1 = ChannelWriteEntry(
  6.     channel="foo",
  7.     value= "123"
  8. )
  9. entry2 = ChannelWriteEntry(
  10.     channel="bar",
  11.     value= "456",
  12.     mapper= lambda v: int(v)
  13. )
  14. tuple_entry = ChannelWriteTupleEntry(
  15.     value= {"foo":"123", "bar":"456"},
  16.     mapper= lambda v: [("baz", int(v["foo"]) + int(v["bar"]))]
  17. )
  18. node = PregelNode(
  19.     triggers=["start"],
  20.     channels=[],
  21.     writers=[ChannelWrite(writes=[entry1, entry2, tuple_entry])]
  22. )
  23. app = Pregel(
  24.     nodes={"body": node},
  25.     channels={
  26.         "start": LastValue(None),
  27.         "foo": LastValue(str),
  28.         "bar": LastValue(int),
  29.         "baz": LastValue(int)
  30.     },
  31.     input_channels=["start"],
  32.     output_channels=["foo", "bar", "baz"],
  33. )
  34. result = app.invoke(input={"start": None})
  35. assert result == {"foo": "123", "bar": 456, "baz": 579}
复制代码
对于通过PregelNode对象表示的Node来说,其bound字段返回的Runnable对象用于执行处理函数,操作执行的结果由writers列表的一组Runnable对象写入相应的Channel,这两个核心工作最终会被如下这个node属性合并。对于Pregel来说,该属性在功能上基本就代表了整个Node,这也是该属性如此命名的原因。
  1. class PregelNode:   
  2.     @cached_property
  3.     def node(self) -> Runnable[Any, Any] | None
复制代码
3. 输入映射

Node基于Channel的输入、触发和输出分别对应channels、triggers和writers字段。如果所有输入Channel读取的原始输入(以字典形式表示)和提交给处理函数的参数有出入,我们还可以利用mapper字段返回的可执行对象(Callable[[Any], Any])作进一步映射。
  1. class PregelNode:
  2.     mapper         : Callable[[Any], Any] | None
复制代码
如下的演示程序通过Pregel的mapper字段设置为了一个Lambda表达式,将提供给Node处理函数处理的字典转换成元组。
  1. from langgraph.channels import LastValue
  2. from langgraph.pregel import Pregel, NodeBuilder
  3. from langgraph.pregel._read import PregelNode
  4. from typing import Tuple
  5. node: PregelNode = (NodeBuilder()
  6.     .subscribe_to("foo","bar")
  7.     .do(lambda args:args)
  8.     .write_to("output")).build()   
  9. node.mapper = lambda args:tuple(args.values())
  10. app = Pregel(
  11.     nodes={"body": node},
  12.     channels={
  13.         "foo": LastValue(str),
  14.         "bar": LastValue(str),
  15.         "output": LastValue(Tuple[str,str]),
  16.     },
  17.     input_channels=["foo","bar"],
  18.     output_channels=["output"],
  19. )
  20. result = app.invoke(input={"foo":"hello", "bar":"world"})
  21. assert result["output"] == ("hello","world")
复制代码
4. 失败重试

Agent中的Node可能会涉及网络传输、数据检索等会导致瞬时错误的操作,失败后自动重试机制是确保可靠性的主要手段,我们可以利用PregelNode 的retry_policy字段设置相应的重试策略。具体的重试策略通过如下所示的RetryPolicy具名元组表示。RetryPolicy的max_attempts和initial_interval分别表示最大重试次数(包含初次调用)和第一次重试前的初始等待时间,单位为秒。如果没有为Node设置针对性的重试策略,Pregel的retry_policy字段设置的重试策略将作为兜底。
  1. class PregelNode:
  2. retry_policy : Sequence[RetryPolicy] | None
  3. class RetryPolicy(NamedTuple):
  4.     initial_interval: float = 0.5
  5.     backoff_factor: float = 2.0
  6.     max_interval: float = 128.0
  7.     max_attempts: int = 3
  8.     jitter: bool = True
  9.     retry_on: (
  10.         type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]
  11.     ) = default_retry_on
  12. class Pregel(
  13.     PregelProtocol[StateT, ContextT, InputT, OutputT],
  14.     Generic[StateT, ContextT, InputT, OutputT]):
  15. retry_policy : Sequence[RetryPolicy] = ()
复制代码
重试策略采用基于“间隔倍增”的退避机制(Back off),也就是下次重试等待时间是前一次等待的N倍,这个倍数通过backoff_factor字段来提供,max_interval字段为等待实现设置了上限。为了防止多个并发Node同时重试而产生“惊群效应”,我们可以在重试间隔中添加随机抖动,jitter是这一特性的开关。调用失败有很多原因,重试在任何错误场景中都有意义,我们可以利用retry_on字段设置为重置设置前置条件。该字段的默认值对应如下这个default_retry_on函数。
  1. def default_retry_on(exc: Exception) -> bool:
  2.     import httpx
  3.     import requests
  4.     if isinstance(exc, ConnectionError):
  5.         return True
  6.     if isinstance(exc, httpx.HTTPStatusError):
  7.         return 500 <= exc.response.status_code < 600
  8.     if isinstance(exc, requests.HTTPError):
  9.         return 500 <= exc.response.status_code < 600 if exc.response else True
  10.     if isinstance(
  11.         exc,
  12.         (
  13.             ValueError,
  14.             TypeError,
  15.             ArithmeticError,
  16.             ImportError,
  17.             LookupError,
  18.             NameError,
  19.             SyntaxError,
  20.             RuntimeError,
  21.             ReferenceError,
  22.             StopIteration,
  23.             StopAsyncIteration,
  24.             OSError,
  25.         ),
  26.     ):
  27.         return False
  28.     return True
复制代码
5. 结果缓存

如果Node绑定一个相对耗时的计算,并且结果完全由给定的输入决定,那么针对输入对结果予以缓存无疑是改善时延的好办法。基于结果的缓存可以通过PregelNode 的cache_policy字段返回的缓存策略来控制。缓存策略通过CachePolicy类型表示,它具有key_func和ttl两个字段,前者提供一个用于解析缓存键(字符串或者字节数组)的可执行对象,后者用于设置缓存过期时间(如果没有显示设置,意味着永不过期)。
  1. from langgraph.channels import LastValue
  2. from langgraph.types import RetryPolicy
  3. from langgraph.pregel import Pregel, NodeBuilder
  4. from langgraph.pregel._read import PregelNode
  5. from typing import Any
  6. def get_handler():
  7.     times = 0
  8.     def handle(args: dict[str, Any]) -> str:
  9.         nonlocal times
  10.         times += 1
  11.         if times < 3:
  12.             raise Exception("manually thrown error.")
  13.         return "Success"
  14.     return handle
  15. def build_node(max_attempts: int) -> PregelNode:
  16.     node = (
  17.         NodeBuilder().subscribe_to("start").do(get_handler()).write_to("output")
  18.     ).build()
  19.     node.retry_policy = [RetryPolicy(max_attempts=max_attempts)]
  20.     return node
  21. def build_pregel(max_attempts: int) -> Pregel:
  22.     return Pregel(
  23.         nodes={"body": build_node(max_attempts)},
  24.         channels={
  25.             "start": LastValue(None),
  26.             "output": LastValue(str),
  27.         },
  28.         input_channels=["start"],
  29.         output_channels=["output"],
  30.     )
  31. app = build_pregel(max_attempts=2)
  32. try:
  33.     app.invoke(input={"start": None})
  34.     assert False, "Expected an exception but none was raised."
  35. except Exception as e:
  36.     assert str(e) == "manually thrown error."
  37. app = build_pregel(max_attempts=3)
  38. result = app.invoke(input={"start": None})
  39. assert result["output"] == "Success"
复制代码
对结果实施缓存的前提是需要将输入的“指纹”作为缓存键,这里的缓存键根据通过INPUT_CACHE_KEY_TYPE定义的二元组进行计算,该二元组前半部分提供的可执行对象相当于一个哈希函数,能够将原始内容转换成“指纹”;后者提供的多元组以路径的方式唯一标识当前的节点。CachePolicy的key_func字段对应的可执行对象将会作为INPUT_CACHE_KEY_TYPE二元组的前半部分,从定义可以看出默认设置的default_cache_key函数会利用pickle以序列化的方式将原始输入转换成字节作为指纹。该指纹和二元组后半部分合并称为Node执行结果缓存项的Key。
这里之所以需要强制使用字符串或者字节来表示缓存键,是因为Pregel针对缓存的实现并不限于内存存储,原则上可以与任意的内存数据库进行整合(比如redis)。缓存存储在Pregel中通过如下这个抽象基类BaseCache表示,它定义了一系列的抽象方法完成针对缓存的读取、写入和清除,我们可以通过派生此基类实现自定义的缓存存储。开发测试阶段我们经常会使用基于内存存储的InMemoryCache。如果没有对Node的缓存策略作针对性设置,在Pregel对象上设置的缓存策略将作为兜底。
  1. class PregelNode:   
  2.     cache_policy : CachePolicy | None
  3.     @cached_property
  4. def input_cache_key(self) -> INPUT_CACHE_KEY_TYPE       
  5. @dataclass(**_DC_KWARGS)
  6. class CachePolicy(Generic[KeyFuncT]):
  7.     key_func: KeyFuncT = default_cache_key  
  8. ttl: int | None = None
  9. KeyFuncT = TypeVar("KeyFuncT", bound=Callable[..., str | bytes])
  10. INPUT_CACHE_KEY_TYPE = tuple[Callable[..., Any], tuple[str, ...]]
  11. def default_cache_key(*args: Any, **kwargs: Any) -> str | bytes:
  12.     return pickle.dumps((_freeze(args), _freeze(kwargs)), protocol=5, fix_imports=False)
复制代码
在如下所示的演示程序中,对于创建的Pregel的唯一Node,它虽然具有两个输入Channel(foo和bar),但是对应的处理函数会将当前时间戳作为返回结果。我们为该Node设置了缓存策略,并将过期时间设置为30秒。
  1. class BaseCache(ABC, Generic[ValueT]):
  2.     serde: SerializerProtocol = JsonPlusSerializer(pickle_fallback=True)
  3.     def __init__(self, *, serde: SerializerProtocol | None = None) -> None:
  4.         self.serde = serde or self.serde
  5.     @abstractmethod
  6.     def get(self, keys: Sequence[FullKey]) -> dict[FullKey, ValueT]:
  7.     @abstractmethod
  8.     async def aget(self, keys: Sequence[FullKey]) -> dict[FullKey, ValueT]:
  9.     @abstractmethod
  10.     def set(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) -> None:
  11.     @abstractmethod
  12.     async def aset(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) -> None:
  13.     @abstractmethod
  14.     def clear(self, namespaces: Sequence[Namespace] | None = None) -> None:
  15.     @abstractmethod
  16. async def aclear(self, namespaces: Sequence[Namespace] | None = None) -> None:
  17. class Pregel(
  18.     PregelProtocol[StateT, ContextT, InputT, OutputT],
  19.     Generic[StateT, ContextT, InputT, OutputT]):
  20.     cache: BaseCache | None = None
  21.     cache_policy: CachePolicy | None = None
复制代码
我们以5秒为间隔调用了Pregel对象三次,前两次使用相同的输入({"foo":"abc", "bar":"xyz"})。三次调用的时间戳和输入输出会以如下的形式打印出来,我们可以清晰地看到前两次由于提供了相同的参数,所以得到了相同的结果,很明显第二次得到的是缓存的结果。
  1. from langgraph.channels import LastValue
  2. from langgraph.types import CachePolicy
  3. from langgraph.pregel import Pregel,NodeBuilder
  4. import datetime,time
  5. from langgraph.cache.memory import InMemoryCache
  6. node = (NodeBuilder()
  7.         .subscribe_to("foo","bar")
  8.         .do(lambda _: datetime.datetime.now())
  9.         .write_to("output")).build()
  10. node.cache_policy = CachePolicy(ttl=30)
  11. app = Pregel(
  12.     nodes={"body": node},
  13.     cache=InMemoryCache(),
  14.     channels={
  15.         "foo": LastValue(str),
  16.         "bar": LastValue(str),
  17.         "output": LastValue(str),
  18.     },
  19.     input_channels=["foo","bar"],
  20.     output_channels=["output"])
  21. input = {"foo":"abc", "bar":"xyz"}
  22. result = app.invoke(input=input)
  23. print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
  24. time.sleep(5)
  25. result = app.invoke(input=input)
  26. print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
  27. time.sleep(5)
  28. input = {"foo":"xyz", "bar":"abc"}
  29. result = app.invoke(input=input)
  30. print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
复制代码
6.补遗

前面已经介绍了PregelNode类型的大部分核心成员,对于如下几个遗漏的成员,我们在这里作一下概况性介绍。tags使我们可以在Node上打上相应的标签,而metadata则可以在它上面附加任意的元数据。
  1. [2026-01-31 23:51:20.178285]{'foo': 'abc', 'bar': 'xyz'} -> 2026-01-31 23:51:20.177192
  2. [2026-01-31 23:51:25.180527]{'foo': 'abc', 'bar': 'xyz'} -> 2026-01-31 23:51:20.177192
  3. [2026-01-31 23:51:30.183805]{'foo': 'xyz', 'bar': 'abc'} -> 2026-01-31 23:51:30.182490
复制代码
Node结合边构成了图,而图本身也可以作为一个Node参与构建一个更大的图,所以图具有一个嵌套的层级结构,一个Node可以包含一组子图,体现在subgraphs字段上。方法copy对返回自身的一个浅拷贝,至于四个方法(invoke、ainvoke、stream和astream)实现的两种调用模式,最终还是通过调用bound字段的Runnable对象的同名方法实现的。
我们最后使用最简单的语言对Pregel做一个总结:我们可以将 PregelNode 想象成一个智能反应堆,其中triggers是点火装置(决定什么时候开始),channels是原料管道(输入数据),mapper 是入料加工(数据预处理),bound 是核心反应室(业务逻辑),writers 是成品输送带(更新状态)。
7. NodeBuilder

为了让大家对表示Node的PregelNode类型有深入地理解,在前面的演示中我们大都采用直接对其字段进行设置的方式,实际上在真正的开发中基本不会这么做,而是选择使用NodeBuilder来构建它,后者提供更加精简的API。
  1. class PregelNode:        
  2.     tags : Sequence[str] | None
  3.     metadata : Mapping[str, Any] | None       
  4.     subgraphs : Sequence[PregelProtocol]   
  5.     def copy(self, update: dict[str, Any]) -> PregelNode   
  6.     def invoke(
  7.         self,
  8.         input: Any,
  9.         config: RunnableConfig | None = None,
  10.         **kwargs: Any | None,
  11.     ) -> Any   
  12.     async def ainvoke(
  13.         self,
  14.         input: Any,
  15.         config: RunnableConfig | None = None,
  16.         **kwargs: Any | None,
  17.     ) -> Any
  18.     def stream(
  19.         self,
  20.         input: Any,
  21.         config: RunnableConfig | None = None,
  22.         **kwargs: Any | None,
  23.     ) -> Iterator[Any]
  24.     async def astream(
  25.         self,
  26.         input: Any,
  27.         config: RunnableConfig | None = None,
  28.         **kwargs: Any | None,
  29.     ) -> AsyncIterator[Any]       
复制代码
如果构建的Node只需定义一个Channel,我们可以调用subscribe_only方法,它将以字符串的(不是列表)形式赋值给channels字段。subscribe_to方法默认会将指定的Channel同时添加到triggers和channels列表中,如果将read参数设置为False,指定的Channel只会作为输入添加到channels列表中。read_from方法指定的仅仅是输入Channel,所以只会添加到channels列表中。
如果自行构建PregelNode,针对Channel的输出会很麻烦,使用NodeBuilder的write_to方法就简单多了,我们只需要指定输出Channel的名称列表就可以了。如果需要多输出作更细粒度的控制,也可以指定一组ChannelWriteEntry对象。我们也可以利用write_to方法提供的关键字参数针对Channel的写入(此时参数名会作为输出Channel的名称),其类型_WriteValue定义如下,所以我们可以使用兼容的Lambda表达式简单快捷地完成输出。
  1. class NodeBuilder:
  2.     def subscribe_only(
  3.         self,
  4.         channel: str,
  5.     ) -> Self
  6.     def subscribe_to(
  7.         self,
  8.         *channels: str,
  9.         read: bool = True,
  10.     ) -> Self
  11.     def read_from(
  12.         self,
  13.         *channels: str,
  14.     ) -> Self
  15.     def write_to(
  16.         self,
  17.         *channels: str | ChannelWriteEntry,
  18.         **kwargs: _WriteValue,
  19.     ) -> Self
  20.     def meta(self, *tags: str, **metadata: Any) -> Self
  21.     def add_retry_policies(self, *policies: RetryPolicy) -> Self
  22.     def add_cache_policy(self, policy: CachePolicy) -> Self
  23.     def build(self) -> PregelNode
复制代码
前面介绍的打标签和附加元数据的功能可以调用NodeBuilder的meta方法来完成,失败重试和结果缓存策略则由add_retry_policies和add_cache_policy方法来提供。等所有设置完成之后,我们直接调用build方法将目标Node构建出来。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册