找回密码
 立即注册
首页 业界区 业界 [拆解LangChain执行引擎]回到过去,开启平行世界[上篇] ...

[拆解LangChain执行引擎]回到过去,开启平行世界[上篇]

郁兰娜 3 天前
Pregel还提供了如下所示的update_state/aupdate_state和bulk_update_state/abulk_update_state方法,我们可以调用它们以增量的方式修改部分状态。有一点需要明确的是,这些方法并非直接修改某个持久化的Checkpoint,而是在此基础上创建一个子Checkpoint,所以历史是不会被篡改的,只会在某个时间点开启了一个 平行世界 。创建的Checkpoint的标识会存储返回的RunnableConfig配置中。
这些方法总是以一个Node的名义模拟一个具体的任务来更新对应的状态值。虽然这个模拟的任务不会真正被执行,但是它必须关联一个有效的Node,因为需要借助于对应Node的writers 列表完成写入操作。针对单一任务的更新请求通过一个StateUpdate对象来表示,它的as_node和task_id字段表示状态更新的模拟任务的Node名称和任务标识。
  1. class Pregel(
  2.     PregelProtocol[StateT, ContextT, InputT, OutputT],
  3.     Generic[StateT, ContextT, InputT, OutputT]):
  4.    
  5.     def bulk_update_state(
  6.         self,
  7.         config: RunnableConfig,
  8.         supersteps: Sequence[Sequence[StateUpdate]],
  9.     ) -> RunnableConfig
  10.     async def abulk_update_state(
  11.         self,
  12.         config: RunnableConfig,
  13.         supersteps: Sequence[Sequence[StateUpdate]],
  14.      ) -> RunnableConfig
  15.     def update_state(
  16.         self,
  17.         config: RunnableConfig,
  18.         values: dict[str, Any] | Any | None,
  19.         as_node: str | None = None,
  20.         task_id: str | None = None,
  21.     ) -> RunnableConfig
  22.     async def aupdate_state(
  23.         self,
  24.         config: RunnableConfig,
  25.         values: dict[str, Any] | Any,
  26.         as_node: str | None = None,
  27.         task_id: str | None = None,
  28.     ) -> RunnableConfig
  29. class StateUpdate(NamedTuple):
  30.     values: dict[str, Any] | None
  31.     as_node: str | None = None
  32.     task_id: str | None = None
复制代码
1. 单一Node更新

update_state/aupdate_state方法用于单一Node的状态更新。如果没有利用as_node参数显式指定Node,这两个方法会默认使用最后一次实施更新的Node。那么如何确定最后一次状态更新来源于哪个Node呢?
还记得Checkpoint的channel_versions和versions_seen字段吗?前者返回所有Channel的最新版本,后者返回每个Node可见的Channel和版本。能够看到具有最高版本的那个Channel的所有Node都会被认为是实施了最新的状态更新。如果通过这种策略解析出来的Node不止一个,就会抛出异常并提醒我们提供确定的as_node,如下的演示程序体现了这一点。
  1. from langgraph.channels import LastValue
  2. from langgraph.pregel import Pregel, NodeBuilder
  3. from langgraph.checkpoint.memory import InMemorySaver
  4. from functools import partial
  5. def handle(node:str, args:dict)->str:
  6.     return node
  7. foo = (NodeBuilder()
  8.         .subscribe_to("start",read=False)
  9.         .do(partial(handle, "foo"))
  10.         .write_to("bar"))
  11. bar1 = (NodeBuilder()
  12.         .subscribe_to("bar",read=False)
  13.         .do(partial(handle, "bar1"))
  14.         .write_to("bar1"))
  15. bar2 = (NodeBuilder()
  16.         .subscribe_to("bar",read=False)
  17.         .do(partial(handle, "bar2"))
  18.         .write_to("bar2"))
  19. app = Pregel(
  20.     nodes={"foo": foo, "bar1": bar1, "bar2": bar2},
  21.     channels={
  22.         "start": LastValue(None),
  23.         "bar": LastValue(str),
  24.         "bar1": LastValue(str),
  25.         "bar2": LastValue(str),
  26.     },
  27.     input_channels=["start"],
  28.     output_channels=["bar1","bar2"],
  29.     checkpointer=InMemorySaver(),
  30. )
  31. config={"configurable": {"thread_id": "tx123"}}
  32. result = app.invoke(input={"start": None}, config=config)
  33. assert result== {"bar1": "bar1", "bar2": "bar2"}
  34. new_config = app.update_state(
  35.     config=config,
  36.     values= {"bar1": "bar1[new]"},
  37. )
  38. state = app.get_state(config=new_config)
  39. print(state.values)
复制代码
上面这个Pregel体现的执行流程很简单,启动的时候通过写入通道start驱动节点foo的执行,后者结束之后写入通道bar驱动节点bar1和bar2的执行,这两个Node最终会将自身的Node名称写入对应的通道bar1和bar2。在完成常规调用之后,我们调用update_state方法试图将最终通道bar1的值改写成bar1[new],但是最终会抛出InvalidUpdateError异常,并提示Ambiguous update, specify as_node。
由于没有在作为输入参数的RunnableConfig配置中指定Checkpoint ID,所以针对状态的修改是基于最后创建的Checkpoint进行的。按照我们上面说的逻辑,引擎会根据每个Node可见Channel的最高版本得到最近一次更新状态的Node。很明显,节点bar1和bar2订阅的通道bar具有更高的版本。由于不满足Node的唯一性,所以InvalidUpdateError异常被抛出。由于通道bar1是由节点bar1写入的,所以我们将as_node参数设置为 “bar1” (实际上你设置任意的Node名称都可以,只要对应Node存在即可)。
  1. new_config = app.update_state(
  2.     config=config,
  3.     values= {"bar1": "bar1[new]"},
  4.     as_node="bar1"
  5. )
  6. state = app.get_state(config=new_config)
  7. print(state.values)
复制代码
你以为这就结束了吗?虽然这次状态更新成功了,但是最新的状态却非我所愿。从如下的输出结果可以看出,“bar1”的值是“{'bar1': 'bar1[new]'}”,也就是说它是将作为参数values的字典整个作为了写入的值。
  1. {'start': None, 'bar': 'foo', 'bar1': {'bar1': 'bar1[new]'}, 'bar2': 'bar2'}
复制代码
要解释这个问题,就必须真正了解状态究竟是如何被更新的。我们之所以需要确定以哪个Node的名义更新状态,并不仅仅是为了审核的目的补充必要的信息,实际上整个更新操作依赖于对应PregelNode的writers列表。我们再回顾一下PregelNode如下所示的writers字段,它返回的Channel写入器体现为一组Runnable对象。
  1. class PregelNode:
  2.     writers: list[Runnable]
  3.     @cached_property
  4.     def flat_writers(self) -> list[Runnable]
  5. class ChannelWrite(RunnableCallable):
  6.     writes: list[ChannelWriteEntry | ChannelWriteTupleEntry | Send]
  7.     def __init__(
  8.         self,
  9.         writes: Sequence[ChannelWriteEntry | ChannelWriteTupleEntry | Send],
  10.         *,
  11.         tags: Sequence[str] | None = None,
  12.     )
  13. class ChannelWriteEntry(NamedTuple):
  14.     channel: str
  15.     value: Any = PASSTHROUGH
  16.     skip_none: bool = False
  17.     mapper: Callable | None = None
  18. class ChannelWriteTupleEntry(NamedTuple):
  19.     mapper: Callable[[Any], Sequence[tuple[str, Any]] | None]
  20.     value: Any = PASSTHROUGH
  21. static: Sequence[tuple[str, Any, str | None]] | None = None
  22. PASSTHROUGH = object()
复制代码
每个Channel写入器对应一个ChannelWrite对象,后者针对针对Channel的写入意图会被添加到writes字段对用的列表中,这是由一组ChannelWriteEntry、ChannelWriteTupleEntry或者Send对象的列表。当update_state/aupdate_state方法将Node确定下来后,这个列表被提取出来,对于ChannelWriteEntry和ChannelWriteTupleEntry,其values字段被替换成传入update_state/aupdate_state方法的values参数,仅此而已。这就是我们提供的字典作为整体被写入通道bar1的原因,所以调用update_state/aupdate_state方法的时候按照如下的方式提供具体的值就好。
  1. new_config = app.update_state(
  2.     config=config,
  3.     values= "bar1[new]",
  4.     as_node="bar1"
  5. )
复制代码
我们在前面说过,update_state/aupdate_state方法是通过在最新或者指定Checkpoint基础上创建一个新的Checkpoint,进而达到更新状态的目的,我们可以输出完整的历史来证明这一点。
  1. for state in app.get_state_history(config):
  2.     metadata = state.metadata
  3.     step=metadata['step']
  4.     source=metadata["source"]
  5.     print(f"step:{step}\nsource:{source}\nvalues: {state.values}\n")
复制代码
在完成状态更新后,我们使用上面的代码提取组成历史的每个快照,并将对应的Superstep、Source和Values输出来。在如下所示的输出中,Superstep 2对应的快照就是调用update_state方法产生的,这个快照还具有不同的Source(update)揭示它的与众不同。
  1. step:2
  2. source:update
  3. values: {'start': None, 'bar': 'foo', 'bar1': 'bar1[new]', 'bar2': 'bar2'}
  4. step:1
  5. source:loop
  6. values: {'start': None, 'bar': 'foo', 'bar1': 'bar1', 'bar2': 'bar2'}
  7. step:0
  8. source:loop
  9. values: {'start': None, 'bar': 'foo'}
  10. step:-1
  11. source:input
  12. values: {'start': None}
复制代码
2. 更新失效

如果我们对上述的更新原理不了解,在遇到一些状态更新失效的场景,可能永远找不到问题的症结。比如对于如下这个简单的例子,Pregel唯一的Node会将值foo写入Channel,调用之后针对结果的断言也证实了写入时成功的。
  1. from langgraph.channels import LastValue
  2. from langgraph.pregel import Pregel, NodeBuilder
  3. from langgraph.checkpoint.memory import InMemorySaver
  4. node = (NodeBuilder()
  5.         .subscribe_only("foo")
  6.         .do(lambda args: args)
  7.         .write_to(output=lambda _: "foo"))
  8. app = Pregel(
  9.     nodes={"node": node},
  10.     channels={
  11.         "foo": LastValue(str),
  12.         "output": LastValue(str),
  13.     },
  14.     input_channels=["foo"],
  15.     output_channels=["output"],
  16.     checkpointer=InMemorySaver(),
  17. )
  18. config={"configurable": {"thread_id": "tx123"}}
  19. result = app.invoke(input={"foo": "foo"}, config=config)
  20. assert state.values["output"] == "foo" # state remains unchanged
复制代码
我们本希望调用update_state方法将输出改写为bar。但是从调用get_state的结果来看状态并没有更新成功。那么是因为新的Checkpoint没有创建吗?为此我们按照如下的方式输出整个历史。
  1. for state in app.get_state_history(config):
  2.     metadata = state.metadata
  3.     step=metadata['step']
  4.     source=metadata["source"]
  5.     print(f"step {step}\nsource: {source}\nvalues: {state.values}")
  6.     print()
复制代码
从如下的输出结果可以看出,update_state方法调用对应的Checkpoint已经成功创建,但是它的状态(values)就是没有改变。
  1. step 1
  2. source: update
  3. values: {'start': None, 'output': 'foo'}
  4. step 0
  5. source: loop
  6. values: {'start': None, 'output': 'foo'}
  7. step -1
  8. source: input
  9. values: {'start': None}
复制代码
这个的问题出在Node的构建上面,由于我们调用调用NodeBuilder的write_to方法时采用了关键字参数来确定目标Channel,并以Lambda表达式的方式提供写入的值,并且Lambda表达式并没有使用原始输入,而是直接硬编码成foo 。这行代码将生成一个ChannelWriteEntry对象,并为它指定正确得Channel名称( “output” ),它的value不会被设置为处理函数的返回值,但是Lambda表达式转换成的Callable对象将作为mapper字段。由于update_state方法仅仅通过对Node的写入器稍加改造来完成Channel写入。对于这个背后创建的ChannelWriteEntry对象来说,改变的只有其value字段,mapper字段将保持不变。由于mapper无脑返回“foo”,所以状态永远也不可能被改变。
3. 对Pending Write的影响

前面介绍的更新都是在一个不存在Pending Write的状态上完成的,如果Superstep尚未完结,并且同时具有完成和中断的任务,update_state方法调用后整个状态又是什么样子呢?经过我的测试,不论我们以“完成任务对应的Node的名义”,还是以“中断任务对应的Node的名义”,针对update_state方法的调用都会以创建新的Checkpoint的方式强行闭合当前Superstep。
以如下这段程序为例,Pregel的两个并行执行的初始节点foo和bar,前者成功执行,后者会遇到中断。我们调用update_state方法以节点“bar” 的名义试图更新bar的状态。在调用前后,我们输出整个历史。
  1. from langgraph.channels import LastValue
  2. from langgraph.pregel import Pregel, NodeBuilder
  3. from langgraph.checkpoint.memory import InMemorySaver
  4. from langgraph.types import interrupt
  5. def handle( args:dict)->str:
  6.     resume = interrupt("Resuming execution")
  7.     return resume
  8. foo = (NodeBuilder()
  9.         .subscribe_to("start",read=False)      
  10.         .do(lambda args:args)
  11.         .write_to("foo"))
  12. bar = (NodeBuilder()
  13.         .subscribe_to("start",read=False)
  14.         .do(handle)
  15.         .write_to("bar"))
  16. app = Pregel(
  17.     nodes={"foo": foo, "bar": bar},
  18.     channels={
  19.         "start": LastValue(None),
  20.         "foo": LastValue(str),
  21.         "bar": LastValue(str),
  22.     },
  23.     input_channels=["start"],
  24.     output_channels=["foo","bar"],
  25.     checkpointer=InMemorySaver(),
  26. )
  27. config={"configurable": {"thread_id": "tx123"}}
  28. def show_history(config):
  29.      for _,checkpoint, metadata,_,pending_writes in app.checkpointer.list(config):
  30.         step=metadata['step']
  31.         source=metadata["source"]
  32.         print(
  33. f"step:{step}\nsource:{source}\nvalues: {checkpoint['channel_values']}\npending_writes: {pending_writes}\n")
  34. app.invoke(input={"start": None}, config=config)
  35. print("After invoke:")
  36. show_history(config)
  37. new_config = app.update_state(
  38.     config=config,
  39.     values= "updated value",
  40.     as_node="bar"
  41. )
  42. print("\nAfter update_state:")
  43. show_history(config)
复制代码
从如下的输出结果可以看出,正常调用之后确实遇到了中断。update_state方法调用之后,通道bar的状态确实被更新。但是新的Checkpoint被创建后,之前的Pending Write将不复存在。
  1. After invoke:
  2. step:-1
  3. source:input
  4. values: {'start': None}
  5. pending_writes: [('a2357188-2d04-182f-8672-0328182f68a0', '__interrupt__', [Interrupt(value='Resuming execution', id='fcb47fce081f41d1e1141d004a9b66f9')]), ('10b0e7ea-3453-e5ad-18a7-eb47f3caf108', 'foo', {})]
  6. After update_state:
  7. step:0
  8. source:update
  9. values: {'start': None, 'bar': 'updated value'}
  10. pending_writes: []
  11. step:-1
  12. source:input
  13. values: {'start': None}
  14. pending_writes: [('a2357188-2d04-182f-8672-0328182f68a0', '__interrupt__', [Interrupt(value='Resuming execution', id='fcb47fce081f41d1e1141d004a9b66f9')]), ('10b0e7ea-3453-e5ad-18a7-eb47f3caf108', 'foo', {}), ('a2357188-2d04-182f-8672-0328182f68a0', 'bar', 'updated value')]
复制代码
本例中我们是以中断节点的名义对状态实施修改的,所以Pending Write被新的状态抹除还说得过去的。但是如果我们按照如下的方式以成功执行的节点foo的名义修改通道foo的状态呢?
  1. new_config = app.update_state(
  2.     config=config,
  3.     values= "updated value",
  4.     as_node="foo"
  5. )
复制代码
从如下所示的输出结果可以看出,虽然我们对中断节点bar没有实施任何操作,update_state方法依然会将其Pending Write抹除。我们从这个例子大体可以看出update_state放背后的逻辑:针对最终状态的更新总是在最新的Checkpoint描述的状态下进行,并且创建一个新的Checkpoint作为最终的状态。
  1. After invoke:
  2. step:-1
  3. source:input
  4. values: {'start': None}
  5. pending_writes: [('87a80b8a-6c7b-a026-68b5-dd39d405a6a4', 'foo', {}), ('8a53715a-dc21-56ce-d0a6-d13ff6604d15', '__interrupt__', [Interrupt(value='Resuming execution', id='8672a7e5d7233e1f8c11fc9ff4b00c56')])]
  6. After update_state:
  7. step:0
  8. source:update
  9. values: {'start': None, 'foo': 'updated value'}
  10. pending_writes: []
  11. step:-1
  12. source:input
  13. values: {'start': None}
  14. pending_writes: [('87a80b8a-6c7b-a026-68b5-dd39d405a6a4', 'foo', {}), ('8a53715a-dc21-56ce-d0a6-d13ff6604d15', '__interrupt__', [Interrupt(value='Resuming execution', id='8672a7e5d7233e1f8c11fc9ff4b00c56')])]
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

昨天 08:51

举报

您需要登录后才可以回帖 登录 | 立即注册