找回密码
 立即注册
首页 业界区 业界 [拆解LangChain执行引擎]非常规Pending Write的持久化 ...

[拆解LangChain执行引擎]非常规Pending Write的持久化

吞脚 3 小时前
PendingWrite三元组的第二部分表示写入的Channel,但是对于一些特殊的场景,比如出错、无写入、中断和恢复,它们的值不再是一个普通的Channel名称,而是使用如下的值:

  • __error__:执行Node对应的任务出现异常;
  • __no_writes__:Node任务成功执行,但是没有执行针对Channel的输出;
  • __interrupt__:任务中断;
  • __resume__:表示恢复执行提供的数据;
接下来我们两个例子来产生上述这几种特殊的Pending Write。我们先来模拟出错的场景,如下面的代码片段所示,我们执行的Pregel对象具有一个唯一的Node,它的处理函数直接抛出一个异常。
  1. from langgraph.pregel import Pregel, NodeBuilder
  2. from langgraph.channels import LastValue
  3. from langgraph.checkpoint.memory import InMemorySaver
  4. from typing import Any
  5. def handle(args: dict[str, Any])->None:
  6.     raise Exception("manllually raised exception")
  7. node = NodeBuilder().subscribe_to("start").do(handle)
  8. app = Pregel(
  9.     nodes={"body": node},
  10.     channels={"start": LastValue(str)},
  11.     checkpointer=InMemorySaver(),
  12.     input_channels=["start"],
  13.     output_channels=[],
  14. )
  15. config = {"configurable": {"thread_id": "123"}}
  16. try:
  17.     result = app.invoke({"start": "begin"}, config=config)
  18. except Exception as ex:
  19.     print(f"Caught exception:{ex}" )
  20. (_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
  21. print(pending_writes)
复制代码
我们在try/except块中完成针对Pregel的调用,并捕捉和输出得到的异常信息。接下来我们调用Checkpointer(一个InMemorySaver对象)的get_tuple方法得到对应的CheckpointTuple元组,然后将pending_writes部分输出出来。从如下所示的输出结果可以看出,这个Pending Write三元组的Channel名称被设置为 __error__ ,整个Exception对象成为了写入的内容。
  1. Caught exception:manllually raised exception
  2. [('f9ff1e88-4d82-f417-ad11-8fd870bfe647', '__error__', "Exception('manllually raised exception')")]
复制代码
由于并不是所有的Node都有向Channel写入执行结果的需求,所以只要处理函数成功执行,即使没有Channel输出的行为,该任务的状态也会被视为成功,Checkpointer只是采用不同的形式来记录这种不需要写入的Pending Write。如下的这个程序不仅仅演示了这种无输出写入的场景,还同时模拟了中断和恢复。
  1. from langgraph.pregel import Pregel, NodeBuilder
  2. from langgraph.channels import LastValue
  3. from langgraph.checkpoint.memory import InMemorySaver
  4. from typing import Any
  5. from langgraph.types import Command, interrupt
  6. def foo(args: dict[str, Any]) -> list[str]:
  7.     resume1 = interrupt("1st interrupt")
  8.     assert resume1 == "1st resume"
  9.     resume2 = interrupt("2nd interrupt")
  10.     assert resume2 == "2nd resume"
  11.     resume3 = interrupt("3rd interrupt")
  12.     assert resume3 == "3rd resume"
  13.     return [resume1, resume2, resume3]
  14. def bar(args: dict[str, Any]) -> None:
  15.     pass
  16. app = Pregel(
  17.     nodes={
  18.         "foo": NodeBuilder().subscribe_only("start").do(foo).write_to("output"),
  19.         "bar": NodeBuilder().subscribe_only("start").do(bar),
  20.     },
  21.     channels={
  22.         "start": LastValue(str),
  23.         "output": LastValue(list[str]),
  24.     },
  25.     input_channels=["start"],
  26.     output_channels=["output"],
  27.     checkpointer=InMemorySaver(),
  28. )
  29. config = {"configurable": {"thread_id": "123"}}
  30. result = app.invoke(input={"start": "begin"}, config=config, stream_mode="tasks")
  31. (_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
  32. print(f"After invoke:\n{pending_writes}")
  33. app.invoke(input=Command(resume="1st resume"), config=config)
  34. (_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
  35. print(f"\nAfter resume 1:\n{pending_writes}")
  36. app.invoke(input=Command(resume="2nd resume"), config=config)
  37. (_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
  38. print(f"\nAfter resume 2:\n{pending_writes}")
  39. result = app.invoke(input=Command(resume="3rd resume"), config=config)
  40. assert result == {"output": ["1st resume", "2nd resume", "3rd resume"]}
  41. (_, _, _, _, pending_writes) = app.checkpointer.get_tuple(config)
  42. print(f"\nAfter resume 3:\n{pending_writes}")
复制代码
如上面的代码片段所示,我们为Pregel提供了两个并行执行的节点foo和bar,其中bar对应的函数并未执行任何有效操作,也没有任何的输出。我们为节点foo对应的处理函数制造了三次人为中断,所以需要至少四次调用才能结束。
我们在创建的RunnableConfig对象中提供了统一的Thread ID,并将它作为后续方法调用的参数。针对Pregel的三次调用,第一次是为常规调用,后面两次分别是针对两次中断的恢复调用。我们在每次调用后,得到并输出Checkpointer记录下来的Pending Write。
从如下的输出结果可以看出,第一次常规调用后, 节点foo停在第一个中断处,节点bar成功执行但没有输出,所以Checkpointer将它们作为Pending Write记录下来,Channel名称分别是__interrupt__和__no_writes__,前者的写入内容是一个Interrupt对象,它具有我们指定的值“1st interrupt”。我们也看到了Interrupt对象具有一个唯一标识,在恢复调用时我们可以利用此标识为其指定针对性的恢复数据(Command(resume={"id":"resume value"))。
  1. After invoke:
  2. [('8d407c25-02f6-9101-d1b8-5a99c247edde', '__interrupt__', [Interrupt(value='1st interrupt', id='5603cdf275d8b8ba0633d272fa176fd3')]), ('22507855-e257-1b5b-eb1a-3c3fb0a071e9', '__no_writes__', None)]
  3. After resume 1:
  4. [('8d407c25-02f6-9101-d1b8-5a99c247edde', '__interrupt__', [Interrupt(value='2nd interrupt', id='5603cdf275d8b8ba0633d272fa176fd3')]), ('22507855-e257-1b5b-eb1a-3c3fb0a071e9', '__no_writes__', None), ('00000000-0000-0000-0000-000000000000', '__resume__', '1st resume'), ('8d407c25-02f6-9101-d1b8-5a99c247edde', '__resume__', ['1st resume'])]
  5. After resume 2:
  6. [('8d407c25-02f6-9101-d1b8-5a99c247edde', '__interrupt__', [Interrupt(value='3rd interrupt', id='5603cdf275d8b8ba0633d272fa176fd3')]), ('22507855-e257-1b5b-eb1a-3c3fb0a071e9', '__no_writes__', None), ('00000000-0000-0000-0000-000000000000', '__resume__', '2nd resume'), ('8d407c25-02f6-9101-d1b8-5a99c247edde', '__resume__', ['1st resume', '2nd resume'])]
  7. After resume 3:
  8. []
复制代码
针对第一个中断的恢复调用后,节点foo停在第二个中断处,此时Checkpointer会创建两个新的Pending Write持久化我们提供的Resume Value(“1st resume”),它的Channel名称就是__resume__,但为什么是两个呢?
这实际上反映了 Pregel 处理外部指令注入与Node内部消费的同步机制。第一个被称为全局Resume Value(Global Resume Value), 它代表从外部(通过Command(resume=...))注入到图中的原始指令。由于它不是由图内Node产生的,因此 Task ID 为空,它是唤醒整个暂停状态的“总开关”。第二个节点foo对全局Resume Value的消费记录,所以具有一个明确的Taks ID。当节点foo被唤醒并执行到interrupt行时,它会从全局Resume Value读取数据。为了保证幂等性和可回溯性,系统会将拿走了哪个Resume Value记录在它的任务路径下。
针对Resume的冗余设置是为了解决重入与回溯问题。全局记录证明了用户确实提供了这个值。Node记录证明了这个值确实被这个特定的interrupt函数调用消费了。一个Node内部可以连续调用多次interrupt函数,系统需要按顺序记录该Node消费过的所有Resume Value,以便在“时间旅行”或重试时能够精确对齐。
当我们调用interrupt函数实施人为中断时,底层实际上会抛出一个GraphInterrupt异常,Pregel通过捕获这个异常进而生成针对性的PendingWrite,所以针对同一个任务有可能有一个唯一的中断类型的PendingWrite。由于恢复执行总是会从头执行Node函数,所以基于中断的PendingWrite并不会恢复执行造成任何影响。所以当我们完成第二次恢复调用后,持久化的中断PendingWrite反映的是针对第二次interrupt函数的调用,对应Interrupt对象的值为2nd interrupt。
Resume Value必须按照顺序提供,因为每遇到一个interrpt函数的调用,都会利用前面介绍过的计算器提供的索引,从Resume Value列表中读取Resume Value作为该调用的返回值,所以持久化的第二个基于恢复的PendingWrite对应的值变成了包含两个Resume Value的列表(['1st resume', '2nd resume'])。
在针对第三个中断的恢复执行结束后,fooNode完成了它的执行任务,而bar对应的任务本身就是成功状态,所以整个Superstep顺利结束,自然也就不存在Pending Write了。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册