invoke/ainvoke方法看起来是采用简单的请求/回复消息交换模式,客户端需等待整个流程执行完毕后才能得到结果,其实方法背后还是会调用stream/astream方法以流的方式进行交互。如果我们直接调用调用这两个方法,并采用相应的流模式,我们就能有效解决客户端长时间无响应的问题,实时地得到对方的反馈。- class Pregel(
- PregelProtocol[StateT, ContextT, InputT, OutputT],
- Generic[StateT, ContextT, InputT, OutputT]):
-
- def stream(
- self,
- input: InputT | Command | None,
- config: RunnableConfig | None = None,
- *,
- context: ContextT | None = None,
- stream_mode: StreamMode | Sequence[StreamMode] | None = None,
- print_mode: StreamMode | Sequence[StreamMode] = (),
- output_keys: str | Sequence[str] | None = None,
- interrupt_before: All | Sequence[str] | None = None,
- interrupt_after: All | Sequence[str] | None = None,
- durability: Durability | None = None,
- subgraphs: bool = False,
- debug: bool | None = None,
- **kwargs: Unpack[DeprecatedKwargs],
- ) -> Iterator[dict[str, Any] | Any]
-
- async def astream(
- self,
- input: InputT | Command | None,
- config: RunnableConfig | None = None,
- *,
- context: ContextT | None = None,
- stream_mode: StreamMode | Sequence[StreamMode] | None = None,
- print_mode: StreamMode | Sequence[StreamMode] = (),
- output_keys: str | Sequence[str] | None = None,
- interrupt_before: All | Sequence[str] | None = None,
- interrupt_after: All | Sequence[str] | None = None,
- durability: Durability | None = None,
- subgraphs: bool = False,
- debug: bool | None = None,
- **kwargs: Unpack[DeprecatedKwargs],
- ) -> AsyncIterator[dict[str, Any] | Any]
复制代码 在stream/astream方法的众多参数中,表示流模式的stream_mode参数最为重要,其对应类型StreamMode以字符串字面量的形式定义了七个选项。流是Pregel引擎向调用者提供数据的基本工作方法,它采用订阅发布的形式。Pregel对象发布的内容由对它订阅决定,因为发布客户端不敢兴趣的内容不但毫无意义,而且还会对影响造成极大的影响。- StreamMode = Literal[
- "values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
- ]
复制代码 在调用stream/astream方法时,我们可以根据需要指定一个或者多个流模式(以StreamMode序列的形式)。如果没有显式设置(None),Pregel对象自身的stream_mode字段会作为兜底,该字典的默认值为“values。如果当前Pregel对象以子图的形式被调用,会默认使用values模式,subgraphs参数用于控制是否希望得到子图的输出。下面列出了七种流模式对应的输出内容:
- values:在每个Superstep结束后输出全部Channel的值;
- updates:针对每个Node输出由它更新的Channel值;
- checkpoints:在创建新的Checkpoint的时候,输出与get_state方法返回值具有相同结构的内容;
- tasks:在Node任务开始和结束的时候输出任务ID、Node名称和其他相关信息;
- debug:可以简单认为是tasks + checkpoints;
- messages:输出语言模型产生的Token和相关元数据;
- 开发者在Node处理函数中利用StreamWriter自行输出的内容;
如果混合使用多种流模式,stream/astream方法会返回一个字典,自带的Key表示当前输出采用的流模式。对于单一模式的调用,会直接返回输出的内容。如果采用custom模式,Node处理方法可以利用SteamWriter向客户端实时输出自定义的内容。StreamWriter和静态上下文一样,都属于当前Runtime的一部分,后者可以利用注入Node处理函数的RunnableConfig提取。下面演示程序会使用所有的流模式,我们在每个Node的处理函数中利用StreamWriter输出当前的Node名称。- from langgraph.checkpoint.memory import InMemorySaver
- from langgraph.pregel import Pregel, NodeBuilder
- from langgraph.channels import LastValue, BinaryOperatorAggregate
- import operator
- from functools import partial
- from langchain_core.runnables import RunnableConfig
- from typing import Any,Sequence
- from langgraph.runtime import Runtime
- from langgraph.types import StreamWriter,StreamMode
- from collections import defaultdict
- def handle(node: str, inputs: dict[str, Any], config: RunnableConfig) -> list[str]:
- runtime:Runtime = config["configurable"].get("__pregel_runtime")
- writer:StreamWriter = runtime.stream_writer
- writer(f"node '{node}' is called.")
- return [node]
- foo = (NodeBuilder()
- .subscribe_to("foo",read = False)
- .do(partial(handle, "foo"))
- .write_to(bar="triggered by foo")
- )
- bar1 = (NodeBuilder()
- .subscribe_to("bar",read = False)
- .do(partial(handle, "bar1"))
- .write_to("output")
- )
- bar2 = (NodeBuilder()
- .subscribe_to("bar",read = False)
- .do(partial(handle, "bar2"))
- .write_to("output"))
- app = Pregel(
- nodes={"foo": foo, "bar1": bar1, "bar2": bar2},
- channels={
- "foo": LastValue(str),
- "bar": LastValue(str),
- "output": BinaryOperatorAggregate(list, operator.add),
- },
- input_channels=["foo"],
- output_channels=["output"],
- checkpointer=InMemorySaver(),
- )
- config={"configurable": {"thread_id": "123"}}
- stream_mode: Sequence[StreamMode] = ["values", "updates","checkpoints","tasks","debug","custom"]
- result: defaultdict[str, list[str]] = defaultdict(list)
- for (mode,chunk) in app.stream(input={"foo": None}, stream_mode= stream_mode, config=config):
- result[mode].append(chunk)
- for mode,chunks in result.items():
- index = 1
- for chunk in chunks:
- print(f"{index}.[{mode}] {chunk}")
- index += 1
- print()
复制代码 创建的Pregel由节点foo、bar1和bar2构成。节点foo率先执行,bar1和bar2随后并行执行。我们将调用stream方法收集到的内容根据流模式分组进行输出。我们来分析一下如下的输出结果:
- 由于三个Node都会涉及到针对Channel的更新,所以会有三个updates模式的输出。
- 整个流程涉及三个Supperstep,两个values模式的输出的全量的状态对应于后两个Superstep。
- 三个Node对应三个任务,所以具有六个tasks模式的输出反映这三个任务的开始和结束。
- 经历的三个Superstep对应三次Checkpoint的创建,所以我们能看到三个checkpoints模式的输出。
- 六个tasks加三个checkpoints,所以有九个debug模式的输出。
- 三个Node中针对StreamWriter的调用对应三个custom模式的输出。
- 1.[checkpoints] {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'parent_config': None, 'values': {'foo': None, 'output': []}, 'metadata': {'source': 'input', 'step': -1, 'parents': {}}, 'next': ['foo'], 'tasks': [{'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'interrupts': (), 'state': None}]}
- 2.[checkpoints] {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': []}, 'metadata': {'source': 'loop', 'step': 0, 'parents': {}}, 'next': ['bar1', 'bar2'], 'tasks': [{'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'interrupts': (), 'state': None}, {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'interrupts': (), 'state': None}]}
- 3.[checkpoints] {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9f3-6cbf-8001-e4219a2f4b58'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': ['bar1', 'bar2']}, 'metadata': {'source': 'loop', 'step': 1, 'parents': {}}, 'next': [], 'tasks': []}
- 1.[debug] {'step': -1, 'timestamp': '2026-01-27T00:08:26.865918+00:00', 'type': 'checkpoint', 'payload': {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'parent_config': None, 'values': {'foo': None, 'output': []}, 'metadata': {'source': 'input', 'step': -1, 'parents': {}}, 'next': ['foo'], 'tasks': [{'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'interrupts': (), 'state': None}]}}
- 2.[debug] {'step': 0, 'timestamp': '2026-01-27T00:08:26.865933+00:00', 'type': 'task', 'payload': {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'input': {}, 'triggers': ('foo',)}}
- 3.[debug] {'step': 0, 'timestamp': '2026-01-27T00:08:26.866516+00:00', 'type': 'task_result', 'payload': {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'error': None, 'result': {'bar': 'triggered by foo'}, 'interrupts': []}}
- 4.[debug] {'step': 0, 'timestamp': '2026-01-27T00:08:26.867326+00:00', 'type': 'checkpoint', 'payload': {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ea-6dad-bfff-aaf158b4ced6'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': []}, 'metadata': {'source': 'loop', 'step': 0, 'parents': {}}, 'next': ['bar1', 'bar2'], 'tasks': [{'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'interrupts': (), 'state': None}, {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'interrupts': (), 'state': None}]}}
- 5.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.867338+00:00', 'type': 'task', 'payload': {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'input': {}, 'triggers': ('bar',)}}
- 6.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.867343+00:00', 'type': 'task', 'payload': {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'input': {}, 'triggers': ('bar',)}}
- 7.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.868043+00:00', 'type': 'task_result', 'payload': {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'error': None, 'result': {'output': ['bar2']}, 'interrupts': []}}
- 8.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.868117+00:00', 'type': 'task_result', 'payload': {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'error': None, 'result': {'output': ['bar1']}, 'interrupts': []}}
- 9.[debug] {'step': 1, 'timestamp': '2026-01-27T00:08:26.868533+00:00', 'type': 'checkpoint', 'payload': {'config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9f3-6cbf-8001-e4219a2f4b58'}}, 'parent_config': {'configurable': {'checkpoint_ns': '', 'thread_id': '123', 'checkpoint_id': '1f0fb144-d9ef-658f-8000-fe11b210fb05'}}, 'values': {'foo': None, 'bar': 'triggered by foo', 'output': ['bar1', 'bar2']}, 'metadata': {'source': 'loop', 'step': 1, 'parents': {}}, 'next': [], 'tasks': []}}
- 1.[tasks] {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'input': {}, 'triggers': ('foo',)}
- 2.[tasks] {'id': 'd40d3fbc-0f70-33fe-1b31-51a3e12846fe', 'name': 'foo', 'error': None, 'result': {'bar': 'triggered by foo'}, 'interrupts': []}
- 3.[tasks] {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'input': {}, 'triggers': ('bar',)}
- 4.[tasks] {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'input': {}, 'triggers': ('bar',)}
- 5.[tasks] {'id': 'c4540722-5a7a-4df3-14e1-abe0fddc9d73', 'name': 'bar2', 'error': None, 'result': {'output': ['bar2']}, 'interrupts': []}
- 6.[tasks] {'id': 'ca9f30fb-ebea-87bf-074c-9a397616125a', 'name': 'bar1', 'error': None, 'result': {'output': ['bar1']}, 'interrupts': []}
- 1.[custom] node 'foo' is called.
- 2.[custom] node 'bar2' is called.
- 3.[custom] node 'bar1' is called.
- 1.[updates] {'foo': {'bar': 'triggered by foo'}}
- 2.[updates] {'bar2': {'output': ['bar2']}}
- 3.[updates] {'bar1': {'output': ['bar1']}}
- 1.[values] {'foo': None, 'bar': 'triggered by foo', 'output': []}
- 2.[values] {'foo': None, 'bar': 'triggered by foo', 'output': ['bar1', 'bar2']}
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |