Pregel还提供了如下所示的update_state/aupdate_state和bulk_update_state/abulk_update_state方法,我们可以调用它们以增量的方式修改部分状态。有一点需要明确的是,这些方法并非直接修改某个持久化的Checkpoint,而是在此基础上创建一个子Checkpoint,所以历史是不会被篡改的,只会在某个时间点开启了一个 平行世界 。创建的Checkpoint的标识会存储返回的RunnableConfig配置中。
这些方法总是以一个Node的名义模拟一个具体的任务来更新对应的状态值。虽然这个模拟的任务不会真正被执行,但是它必须关联一个有效的Node,因为需要借助于对应Node的writers 列表完成写入操作。针对单一任务的更新请求通过一个StateUpdate对象来表示,它的as_node和task_id字段表示状态更新的模拟任务的Node名称和任务标识。- class Pregel(
- PregelProtocol[StateT, ContextT, InputT, OutputT],
- Generic[StateT, ContextT, InputT, OutputT]):
-
- def bulk_update_state(
- self,
- config: RunnableConfig,
- supersteps: Sequence[Sequence[StateUpdate]],
- ) -> RunnableConfig
- async def abulk_update_state(
- self,
- config: RunnableConfig,
- supersteps: Sequence[Sequence[StateUpdate]],
- ) -> RunnableConfig
- def update_state(
- self,
- config: RunnableConfig,
- values: dict[str, Any] | Any | None,
- as_node: str | None = None,
- task_id: str | None = None,
- ) -> RunnableConfig
- async def aupdate_state(
- self,
- config: RunnableConfig,
- values: dict[str, Any] | Any,
- as_node: str | None = None,
- task_id: str | None = None,
- ) -> RunnableConfig
- class StateUpdate(NamedTuple):
- values: dict[str, Any] | None
- as_node: str | None = None
- task_id: str | None = None
复制代码 1. 单一Node更新
update_state/aupdate_state方法用于单一Node的状态更新。如果没有利用as_node参数显式指定Node,这两个方法会默认使用最后一次实施更新的Node。那么如何确定最后一次状态更新来源于哪个Node呢?
还记得Checkpoint的channel_versions和versions_seen字段吗?前者返回所有Channel的最新版本,后者返回每个Node可见的Channel和版本。能够看到具有最高版本的那个Channel的所有Node都会被认为是实施了最新的状态更新。如果通过这种策略解析出来的Node不止一个,就会抛出异常并提醒我们提供确定的as_node,如下的演示程序体现了这一点。- from langgraph.channels import LastValue
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.checkpoint.memory import InMemorySaver
- from functools import partial
- def handle(node:str, args:dict)->str:
- return node
- foo = (NodeBuilder()
- .subscribe_to("start",read=False)
- .do(partial(handle, "foo"))
- .write_to("bar"))
- bar1 = (NodeBuilder()
- .subscribe_to("bar",read=False)
- .do(partial(handle, "bar1"))
- .write_to("bar1"))
- bar2 = (NodeBuilder()
- .subscribe_to("bar",read=False)
- .do(partial(handle, "bar2"))
- .write_to("bar2"))
- app = Pregel(
- nodes={"foo": foo, "bar1": bar1, "bar2": bar2},
- channels={
- "start": LastValue(None),
- "bar": LastValue(str),
- "bar1": LastValue(str),
- "bar2": LastValue(str),
- },
- input_channels=["start"],
- output_channels=["bar1","bar2"],
- checkpointer=InMemorySaver(),
- )
- config={"configurable": {"thread_id": "tx123"}}
- result = app.invoke(input={"start": None}, config=config)
- assert result== {"bar1": "bar1", "bar2": "bar2"}
- new_config = app.update_state(
- config=config,
- values= {"bar1": "bar1[new]"},
- )
- state = app.get_state(config=new_config)
- print(state.values)
复制代码 上面这个Pregel体现的执行流程很简单,启动的时候通过写入通道start驱动节点foo的执行,后者结束之后写入通道bar驱动节点bar1和bar2的执行,这两个Node最终会将自身的Node名称写入对应的通道bar1和bar2。在完成常规调用之后,我们调用update_state方法试图将最终通道bar1的值改写成bar1[new],但是最终会抛出InvalidUpdateError异常,并提示Ambiguous update, specify as_node。
由于没有在作为输入参数的RunnableConfig配置中指定Checkpoint ID,所以针对状态的修改是基于最后创建的Checkpoint进行的。按照我们上面说的逻辑,引擎会根据每个Node可见Channel的最高版本得到最近一次更新状态的Node。很明显,节点bar1和bar2订阅的通道bar具有更高的版本。由于不满足Node的唯一性,所以InvalidUpdateError异常被抛出。由于通道bar1是由节点bar1写入的,所以我们将as_node参数设置为 “bar1” (实际上你设置任意的Node名称都可以,只要对应Node存在即可)。- new_config = app.update_state(
- config=config,
- values= {"bar1": "bar1[new]"},
- as_node="bar1"
- )
- state = app.get_state(config=new_config)
- print(state.values)
复制代码 你以为这就结束了吗?虽然这次状态更新成功了,但是最新的状态却非我所愿。从如下的输出结果可以看出,“bar1”的值是“{'bar1': 'bar1[new]'}”,也就是说它是将作为参数values的字典整个作为了写入的值。- {'start': None, 'bar': 'foo', 'bar1': {'bar1': 'bar1[new]'}, 'bar2': 'bar2'}
复制代码 要解释这个问题,就必须真正了解状态究竟是如何被更新的。我们之所以需要确定以哪个Node的名义更新状态,并不仅仅是为了审核的目的补充必要的信息,实际上整个更新操作依赖于对应PregelNode的writers列表。我们再回顾一下PregelNode如下所示的writers字段,它返回的Channel写入器体现为一组Runnable对象。- class PregelNode:
- writers: list[Runnable]
- @cached_property
- def flat_writers(self) -> list[Runnable]
- class ChannelWrite(RunnableCallable):
- writes: list[ChannelWriteEntry | ChannelWriteTupleEntry | Send]
- def __init__(
- self,
- writes: Sequence[ChannelWriteEntry | ChannelWriteTupleEntry | Send],
- *,
- tags: Sequence[str] | None = None,
- )
- class ChannelWriteEntry(NamedTuple):
- channel: str
- value: Any = PASSTHROUGH
- skip_none: bool = False
- mapper: Callable | None = None
- class ChannelWriteTupleEntry(NamedTuple):
- mapper: Callable[[Any], Sequence[tuple[str, Any]] | None]
- value: Any = PASSTHROUGH
- static: Sequence[tuple[str, Any, str | None]] | None = None
- PASSTHROUGH = object()
复制代码 每个Channel写入器对应一个ChannelWrite对象,后者针对针对Channel的写入意图会被添加到writes字段对用的列表中,这是由一组ChannelWriteEntry、ChannelWriteTupleEntry或者Send对象的列表。当update_state/aupdate_state方法将Node确定下来后,这个列表被提取出来,对于ChannelWriteEntry和ChannelWriteTupleEntry,其values字段被替换成传入update_state/aupdate_state方法的values参数,仅此而已。这就是我们提供的字典作为整体被写入通道bar1的原因,所以调用update_state/aupdate_state方法的时候按照如下的方式提供具体的值就好。- new_config = app.update_state(
- config=config,
- values= "bar1[new]",
- as_node="bar1"
- )
复制代码 我们在前面说过,update_state/aupdate_state方法是通过在最新或者指定Checkpoint基础上创建一个新的Checkpoint,进而达到更新状态的目的,我们可以输出完整的历史来证明这一点。- for state in app.get_state_history(config):
- metadata = state.metadata
- step=metadata['step']
- source=metadata["source"]
- print(f"step:{step}\nsource:{source}\nvalues: {state.values}\n")
复制代码 在完成状态更新后,我们使用上面的代码提取组成历史的每个快照,并将对应的Superstep、Source和Values输出来。在如下所示的输出中,Superstep 2对应的快照就是调用update_state方法产生的,这个快照还具有不同的Source(update)揭示它的与众不同。- step:2
- source:update
- values: {'start': None, 'bar': 'foo', 'bar1': 'bar1[new]', 'bar2': 'bar2'}
- step:1
- source:loop
- values: {'start': None, 'bar': 'foo', 'bar1': 'bar1', 'bar2': 'bar2'}
- step:0
- source:loop
- values: {'start': None, 'bar': 'foo'}
- step:-1
- source:input
- values: {'start': None}
复制代码 2. 更新失效
如果我们对上述的更新原理不了解,在遇到一些状态更新失效的场景,可能永远找不到问题的症结。比如对于如下这个简单的例子,Pregel唯一的Node会将值foo写入Channel,调用之后针对结果的断言也证实了写入时成功的。- from langgraph.channels import LastValue
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.checkpoint.memory import InMemorySaver
- node = (NodeBuilder()
- .subscribe_only("foo")
- .do(lambda args: args)
- .write_to(output=lambda _: "foo"))
- app = Pregel(
- nodes={"node": node},
- channels={
- "foo": LastValue(str),
- "output": LastValue(str),
- },
- input_channels=["foo"],
- output_channels=["output"],
- checkpointer=InMemorySaver(),
- )
- config={"configurable": {"thread_id": "tx123"}}
- result = app.invoke(input={"foo": "foo"}, config=config)
- assert state.values["output"] == "foo" # state remains unchanged
复制代码 我们本希望调用update_state方法将输出改写为bar。但是从调用get_state的结果来看状态并没有更新成功。那么是因为新的Checkpoint没有创建吗?为此我们按照如下的方式输出整个历史。- for state in app.get_state_history(config):
- metadata = state.metadata
- step=metadata['step']
- source=metadata["source"]
- print(f"step {step}\nsource: {source}\nvalues: {state.values}")
- print()
复制代码 从如下的输出结果可以看出,update_state方法调用对应的Checkpoint已经成功创建,但是它的状态(values)就是没有改变。- step 1
- source: update
- values: {'start': None, 'output': 'foo'}
- step 0
- source: loop
- values: {'start': None, 'output': 'foo'}
- step -1
- source: input
- values: {'start': None}
复制代码 这个的问题出在Node的构建上面,由于我们调用调用NodeBuilder的write_to方法时采用了关键字参数来确定目标Channel,并以Lambda表达式的方式提供写入的值,并且Lambda表达式并没有使用原始输入,而是直接硬编码成foo 。这行代码将生成一个ChannelWriteEntry对象,并为它指定正确得Channel名称( “output” ),它的value不会被设置为处理函数的返回值,但是Lambda表达式转换成的Callable对象将作为mapper字段。由于update_state方法仅仅通过对Node的写入器稍加改造来完成Channel写入。对于这个背后创建的ChannelWriteEntry对象来说,改变的只有其value字段,mapper字段将保持不变。由于mapper无脑返回“foo”,所以状态永远也不可能被改变。
3. 对Pending Write的影响
前面介绍的更新都是在一个不存在Pending Write的状态上完成的,如果Superstep尚未完结,并且同时具有完成和中断的任务,update_state方法调用后整个状态又是什么样子呢?经过我的测试,不论我们以“完成任务对应的Node的名义”,还是以“中断任务对应的Node的名义”,针对update_state方法的调用都会以创建新的Checkpoint的方式强行闭合当前Superstep。
以如下这段程序为例,Pregel的两个并行执行的初始节点foo和bar,前者成功执行,后者会遇到中断。我们调用update_state方法以节点“bar” 的名义试图更新bar的状态。在调用前后,我们输出整个历史。- from langgraph.channels import LastValue
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.types import interrupt
- def handle( args:dict)->str:
- resume = interrupt("Resuming execution")
- return resume
- foo = (NodeBuilder()
- .subscribe_to("start",read=False)
- .do(lambda args:args)
- .write_to("foo"))
- bar = (NodeBuilder()
- .subscribe_to("start",read=False)
- .do(handle)
- .write_to("bar"))
- app = Pregel(
- nodes={"foo": foo, "bar": bar},
- channels={
- "start": LastValue(None),
- "foo": LastValue(str),
- "bar": LastValue(str),
- },
- input_channels=["start"],
- output_channels=["foo","bar"],
- checkpointer=InMemorySaver(),
- )
- config={"configurable": {"thread_id": "tx123"}}
- def show_history(config):
- for _,checkpoint, metadata,_,pending_writes in app.checkpointer.list(config):
- step=metadata['step']
- source=metadata["source"]
- print(
- f"step:{step}\nsource:{source}\nvalues: {checkpoint['channel_values']}\npending_writes: {pending_writes}\n")
- app.invoke(input={"start": None}, config=config)
- print("After invoke:")
- show_history(config)
- new_config = app.update_state(
- config=config,
- values= "updated value",
- as_node="bar"
- )
- print("\nAfter update_state:")
- show_history(config)
复制代码 从如下的输出结果可以看出,正常调用之后确实遇到了中断。update_state方法调用之后,通道bar的状态确实被更新。但是新的Checkpoint被创建后,之前的Pending Write将不复存在。- After invoke:
- step:-1
- source:input
- values: {'start': None}
- pending_writes: [('a2357188-2d04-182f-8672-0328182f68a0', '__interrupt__', [Interrupt(value='Resuming execution', id='fcb47fce081f41d1e1141d004a9b66f9')]), ('10b0e7ea-3453-e5ad-18a7-eb47f3caf108', 'foo', {})]
- After update_state:
- step:0
- source:update
- values: {'start': None, 'bar': 'updated value'}
- pending_writes: []
- step:-1
- source:input
- values: {'start': None}
- pending_writes: [('a2357188-2d04-182f-8672-0328182f68a0', '__interrupt__', [Interrupt(value='Resuming execution', id='fcb47fce081f41d1e1141d004a9b66f9')]), ('10b0e7ea-3453-e5ad-18a7-eb47f3caf108', 'foo', {}), ('a2357188-2d04-182f-8672-0328182f68a0', 'bar', 'updated value')]
复制代码 本例中我们是以中断节点的名义对状态实施修改的,所以Pending Write被新的状态抹除还说得过去的。但是如果我们按照如下的方式以成功执行的节点foo的名义修改通道foo的状态呢?- new_config = app.update_state(
- config=config,
- values= "updated value",
- as_node="foo"
- )
复制代码 从如下所示的输出结果可以看出,虽然我们对中断节点bar没有实施任何操作,update_state方法依然会将其Pending Write抹除。我们从这个例子大体可以看出update_state放背后的逻辑:针对最终状态的更新总是在最新的Checkpoint描述的状态下进行,并且创建一个新的Checkpoint作为最终的状态。- After invoke:
- step:-1
- source:input
- values: {'start': None}
- pending_writes: [('87a80b8a-6c7b-a026-68b5-dd39d405a6a4', 'foo', {}), ('8a53715a-dc21-56ce-d0a6-d13ff6604d15', '__interrupt__', [Interrupt(value='Resuming execution', id='8672a7e5d7233e1f8c11fc9ff4b00c56')])]
- After update_state:
- step:0
- source:update
- values: {'start': None, 'foo': 'updated value'}
- pending_writes: []
- step:-1
- source:input
- values: {'start': None}
- pending_writes: [('87a80b8a-6c7b-a026-68b5-dd39d405a6a4', 'foo', {}), ('8a53715a-dc21-56ce-d0a6-d13ff6604d15', '__interrupt__', [Interrupt(value='Resuming execution', id='8672a7e5d7233e1f8c11fc9ff4b00c56')])]
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |