找回密码
 立即注册
首页 业界区 业界 3.Langchain 1.2.0 学习 --- LCEL和Runnable

3.Langchain 1.2.0 学习 --- LCEL和Runnable

杓疠? 4 小时前
LangChain LCEL学习笔记

本来第三章应该学一下Tools和Agent的,后来发现这两个东西是核心,越学东西越多,于是就让G老师帮忙重新列了一下学习路线,按照G老师的路线,这一段应该学习LCEL相关的信息。回归正题,本文将带您深入探索 LCEL 与 Runnable 接口的奥秘。
1. 引言:从“链”说起

1.1 为什么要了解 LCEL?

在 LangChain 的世界里,有一个词贯穿始终——Chain(链)
想象一下:用户输入一段文字,你需要先做情感分析,再根据情感结果选择不同的回复策略,最后将结果格式化输出。如果不用 LCEL,你可能需要写一堆嵌套函数、回调逻辑,代码像意大利面条一样纠缠不清。
但有了 LCEL,一切变得清晰优雅:
  1. chain = (
  2.     sentiment_prompt    # 第一步:情感分析
  3.     | llm               # 第二步:LLM 处理
  4.     | output_parser     # 第三步:格式化输出
  5. )
复制代码
这就是 LCEL 的魅力所在——用管道操作符声明式地组合数据流,让复杂的 AI 工作流变得像搭积木一样简单。
1.2 LCEL 的核心优势

LCEL(LangChain Expression Language)不仅仅是一个语法糖,它为 LangChain 带来了质的飞跃:
优势说明声明式用管道操作符清晰表达数据流,阅读代码如读业务流程组合性轻松组合提示词、模型、输出解析器构建复杂链流式支持原生支持流式输出,用户体验更佳并行执行支持批量和并行处理,性能与效率兼得错误处理内置错误处理机制,链更健壮追踪支持与 LangSmith 无缝集成,调试不再抓狂2. LCEL 基础:一切的开始

2.1 核心概念:Runnable 接口

LCEL 的核心是 Runnable 接口。你可以把它想象成 LangChain 世界的“统一身份证”——无论是提示词模板、LLM 模型,还是输出解析器,都实现了这个接口。
这意味着它们都有共同的方法:
  1. # 基本调用
  2. chain.invoke(input)           # 同步调用
  3. chain.batch(inputs)           # 批量调用
  4. chain.stream(input)           # 流式调用
  5. # 异步调用
  6. await chain.ainvoke(input)    # 异步调用
  7. await chain.abatch(inputs)    # 异步批量调用
  8. async for chunk in chain.astream(input):  # 异步流式调用
复制代码
这就是 Runnable 的核心方法三剑客:invoke、batch、stream。无论你组合什么样的链,都用这几种方式调用。
2.2 第一个 LCEL 链

让我们用一个完整的例子来感受 LCEL 的魅力:
  1. from langchain_core.prompts import ChatPromptTemplate
  2. from langchain_core.output_parsers import StrOutputParser
  3. from langchain_openai import ChatOpenAI
  4. # 初始化模型
  5. base_url = "https://api.minimax.chat/v1"
  6. api_key = "your-api-key"
  7. model = ChatOpenAI(
  8.     model="MiniMax-M2.7",
  9.     base_url=base_url,
  10.     api_key=api_key,
  11. )
  12. # 构建链
  13. prompt = ChatPromptTemplate.from_template("用{language}翻译:{text}")
  14. output_parser = StrOutputParser()
  15. chain = prompt | model | output_parser
  16. # 调用链
  17. result = chain.invoke({
  18.     "language": "中文",
  19.     "text": "Hello, World!"
  20. })
  21. print(result)  # 你好,世界!
复制代码
这就是一个完整的 LCEL 链。数据像水流一样经过三个处理单元:prompt → model → output_parser。
数据流向是这样的:
  1. 输入 {"language": "中文", "text": "Hello, World!"}
  2.     ↓
  3. prompt.invoke() → ChatMessage
  4.     ↓
  5. model.invoke() → AIMessage
  6.     ↓
  7. output_parser.invoke() → String
  8.     ↓
  9. 输出 "你好,世界!"
复制代码
3. Runnable 家族详解

这是本文的重点之一。LangChain 提供了丰富的 Runnable 组件,它们像乐高积木一样可以任意组合。本章节深入了解每一个组件。
3.1 基础构建块

3.1.1 RunnableLambda:将函数变为可组合的组件

RunnableLambda 是最基础的工具,它的作用只有一个——把普通 Python 函数转换成 Runnable 对象
  1. from langchain_core.runnables import RunnableLambda
  2. # 定义一个普通函数
  3. def add_ten(x: int) -> int:
  4.     return x + 10
  5. # 转换为 Runnable
  6. r = RunnableLambda(add_ten)
  7. # 调用
  8. result = r.invoke(5)  # 15
复制代码
你也可以使用匿名函数:
  1. r = RunnableLambda(lambda x: x + 10)
  2. result = r.invoke(5)  # 15
复制代码
为什么要转换为 Runnable?
因为转换后,这个函数就可以和其他 Runnable 用管道操作符连接了
  1. def multiply_ten(x: int) -> int:
  2.     return x * 10
  3. # 链接两个 Runnable
  4. chain = RunnableLambda(add_ten) | RunnableLambda(multiply_ten)
  5. result = chain.invoke(1)  # (1 + 10) * 10 = 110
复制代码
3.1.2 管道操作符 |:像搭积木一样组装链

管道操作符是 LCEL 的灵魂。它的作用很直观:把左侧的输出传给右侧作为输入
  1. # 等价于:result = step2(step1(input))
  2. chain = step1 | step2
  3. # 多步管道
  4. chain = step1 | step2 | step3 | step4
复制代码
除了 | 操作符,还有 .pipe() 方法可以实现同样的效果:
  1. chain = r1.pipe(r2).pipe(r3)
复制代码
两者完全等价,看个人喜好选择。
3.1.3 三种调用方式:invoke / batch / stream

方法说明适用场景invoke单个输入 → 单个输出实时交互、简单场景batch列表输入 → 列表输出批量处理、效率优先stream单个输入 → 迭代器输出流式输出、用户体验
  1. # invoke: 单个输入 -> 单个输出
  2. result = chain.invoke(5)
  3. # batch: 批量输入 -> 列表输出(并行执行)
  4. results = chain.batch([1, 2, 3, 4, 5])
  5. # stream: 流式输出(迭代器)
  6. for item in chain.stream(5):
  7.     print(item)  # 逐步输出
复制代码
batch 会自动并行执行,非常适合需要处理大量数据的场景。如果担心 API 限流,可以用config={"max_concurrency": 3} 参数控制并发数,后面会讲一下config操作
3.2 并行与条件

3.2.1 RunnableParallel:多任务并行处理

有时候我们需要同时执行多个任务,比如同时获取翻译、总结、情感分析结果。RunnableParallel 就是为此而生。
  1. from langchain_core.runnables import RunnableParallel, RunnableLambda
  2. def add_ten(x: int) -> int:
  3.     return x + 10
  4. def multiply_ten(x: int) -> int:
  5.     return x * 10
  6. def square(x: int) -> int:
  7.     return x * x
  8. # 字典形式定义并行任务
  9. parallel = RunnableParallel({
  10.     "add": RunnableLambda(add_ten),
  11.     "mul": RunnableLambda(multiply_ten),
  12.     "square": RunnableLambda(square),
  13. })
  14. # 调用 - 所有任务并行执行
  15. result = parallel.invoke(5)
  16. # {'add': 15, 'mul': 50, 'square': 25}
复制代码
也可以用关键字参数的形式:
  1. parallel2 = RunnableParallel(
  2.     add=RunnableLambda(add_ten),
  3.     mul=RunnableLambda(multiply_ten),
  4. )
复制代码
实际应用场景:在 Agent 开发中,你可以用 RunnableParallel 同时调用多个工具(比如同时查询天气、股票、新闻),然后汇总结果。
3.2.2 RunnableBranch:条件分支(if-elif-else)

RunnableBranch 允许根据条件选择不同的处理路径。它的工作方式类似 if-elif-else 逻辑。
  1. from langchain_core.runnables import RunnableBranch, RunnableLambda
  2. def to_positive(x: int) -> str:
  3.     return f"{x} 是正数"
  4. def to_negative(x: int) -> str:
  5.     return f"{x} 是负数"
  6. def to_zero(x: int) -> str:
  7.     return f"{x} 是零"
  8. # 条件分支 - 最后一个参数作为默认分支
  9. branch = RunnableBranch(
  10.     (lambda x: x > 0, RunnableLambda(to_positive)),
  11.     (lambda x: x < 0, RunnableLambda(to_negative)),
  12.     RunnableLambda(to_zero),  # 默认分支
  13. )
  14. # 调用
  15. print(branch.invoke(5))   # '5 是正数'
  16. print(branch.invoke(-3))  # '-3 是负数'
  17. print(branch.invoke(0))   # '0 是零'
复制代码
在 Agent 开发中的应用:这是 Agent 决策的核心组件!根据用户输入或中间结果,Branch 可以决定:

  • 使用哪个工具
  • 调用哪个模型
  • 走哪个处理流程
RunnableBranch 的多种路由场景

下面展示 6+ 种实用的路由场景:
场景一:分数等级路由
  1. score_branch = RunnableBranch(
  2.     (lambda x: x >= 90, RunnableLambda(lambda x: f"等级A: {x}分")),
  3.     (lambda x: x >= 80, RunnableLambda(lambda x: f"等级B: {x}分")),
  4.     (lambda x: x >= 70, RunnableLambda(lambda x: f"等级C: {x}分")),
  5.     (lambda x: x >= 60, RunnableLambda(lambda x: f"等级D: {x}分")),
  6.     RunnableLambda(lambda x: f"等级F: {x}分"),  # 默认分支
  7. )
  8. scores = [95, 82, 75, 63, 45]
  9. for s in scores:
  10.     print(f"  {s}分 -> {score_branch.invoke(s)}")
  11. # 输出:
  12. # 95分 -> 等级A: 95分
  13. # 82分 -> 等级B: 82分
  14. # 75分 -> 等级C: 75分
  15. # 63分 -> 等级D: 63分
  16. # 45分 -> 等级F: 45分
复制代码
场景二:类型路由
  1. user_chain = RunnableLambda(lambda x: f"用户处理: {x['name']}")
  2. product_chain = RunnableLambda(lambda x: f"商品处理: {x['name']}")
  3. order_chain = RunnableLambda(lambda x: f"订单处理: {x['name']}")
  4. type_router = RunnableBranch(
  5.     (lambda x: x.get("type") == "user", user_chain),
  6.     (lambda x: x.get("type") == "product", product_chain),
  7.     (lambda x: x.get("type") == "order", order_chain),
  8.     RunnableLambda(lambda x: f"默认处理: {x}"),
  9. )
  10. print(type_router.invoke({"type": "user", "name": "Alex"}))
  11. # 输出:用户处理: Alex
复制代码
场景三:数值范围路由
  1. high_chain = RunnableLambda(lambda x: f"高值处理: {x * 10}")
  2. medium_chain = RunnableLambda(lambda x: f"中值处理: {x * 5}")
  3. low_chain = RunnableLambda(lambda x: f"低值处理: {x * 2}")
  4. value_router = RunnableBranch(
  5.     (lambda x: x > 100, high_chain),
  6.     (lambda x: x > 50, medium_chain),
  7.     low_chain,
  8. )
  9. print(value_router.invoke(60))   # 中值处理: 300
  10. print(value_router.invoke(150))   # 高值处理: 1500
复制代码
场景四:字符串前缀路由
  1. string_processor = RunnableBranch(
  2.     (lambda x: x.startswith("upper:"),
  3.      RunnableLambda(lambda x: x[6:].upper())),
  4.     (lambda x: x.startswith("lower:"),
  5.      RunnableLambda(lambda x: x[6:].lower())),
  6.     (lambda x: x.startswith("cap:"),
  7.      RunnableLambda(lambda x: x[4:].capitalize())),
  8.     RunnableLambda(lambda x: f"[默认] {x}"),
  9. )
  10. print(string_processor.invoke("upper:hello"))  # HELLO
  11. print(string_processor.invoke("lower:HELLO"))  # hello
  12. print(string_processor.invoke("cap:hello"))   # Hello
复制代码
3.2.3 RouterRunnable:简单 key→handler 映射

有时候只需要简单的 key 映射,不需要复杂的条件判断。RouterRunnable 就是为这种情况设计的。
  1. from langchain_core.runnables.router import RouterRunnable
  2. # 创建路由表:key -> Runnable
  3. router = RouterRunnable(runnables={
  4.     "add": RunnableLambda(lambda x: x + 10),
  5.     "multiply": RunnableLambda(lambda x: x * 10),
  6.     "square": RunnableLambda(lambda x: x ** 2),
  7. })
  8. # 输入必须是字典,包含 "key" 和 "input"
  9. result = router.invoke({"key": "add", "input": 5})
  10. print(result)  # 15
复制代码
RouterRunnable vs RunnableBranch 对比
特性RouterRunnableRunnableBranch路由方式根据 key 字段选择根据条件函数返回 True/False输入格式必须是 {"key": "...", "input": ...}任意类型灵活性低,key 必须是字符串高,可自定义任意条件复杂度简单,适合 handler 映射复杂,适合 if-elif-else选择建议

  • 需要简单的 key → handler 映射?用 RouterRunnable
  • 需要复杂的条件判断?用 RunnableBranch
3.3 数据流转神器

3.3.1 RunnablePassthrough:透传与字段选择

RunnablePassthrough 有两个主要用途:透传输入字段操作
基本透传
  1. from langchain_core.runnables import RunnablePassthrough, RunnableLambda
  2. def multiply_ten(x: int) -> int:
  3.     return x * 10
  4. # 透传当前值,同时进行其他处理
  5. chain = RunnableLambda(multiply_ten) | {
  6.     "original": RunnablePassthrough(),  # 透传原始输入
  7.     "processed": RunnableLambda(lambda x: x * 10),
  8. }
  9. result = chain.invoke(5)
  10. # {'original': 5, 'processed': 50}
复制代码
使用 .pick() 选择字段
  1. # 从字典中选择单个字段
  2. pick_name = RunnablePassthrough().pick("name")
  3. result = pick_name.invoke({"name": "Alex", "age": 30})
  4. # 'Alex'
  5. # 从字典中选择多个字段
  6. pick_multi = RunnablePassthrough().pick(["name", "age"])
  7. result = pick_multi.invoke({"name": "Alex", "age": 30})
  8. # {'name': 'Alex', 'age': 30}
复制代码
使用 .assign() 添加字段
  1. # assign - 添加字段
  2. chain = (
  3.     RunnableLambda(lambda x: {"input": x})
  4.     | RunnablePassthrough.assign(added=lambda x: x["input"] + 10)
  5. )
  6. result = chain.invoke(5)
  7. # {'input': 5, 'added': 15}
  8. # assign 多个字段
  9. chain_multi_assign = (
  10.     RunnableLambda(lambda x: {"original": x})
  11.     | RunnablePassthrough.assign(
  12.         doubled=lambda x: x["original"] * 2,
  13.         squared=lambda x: x["original"] ** 2,
  14.     )
  15. )
  16. result = chain_multi_assign.invoke(5)
  17. # {'original': 5, 'doubled': 10, 'squared': 25}
复制代码
3.3.2 RunnableSequence:显式序列创建

RunnableSequence 用于显式创建有序的 Runnable 序列,等同于使用 | 管道操作符。
  1. from langchain_core.runnables import RunnableSequence, RunnableLambda
  2. def add_ten(x: int) -> int:
  3.     return x + 10
  4. def multiply_ten(x: int) -> int:
  5.     return x * 10
  6. def square(x: int) -> int:
  7.     return x * x
  8. def to_string(x: int) -> str:
  9.     return f"结果是: {x}"
  10. # 显式创建序列
  11. sequence = RunnableSequence(
  12.     first=RunnableLambda(add_ten),
  13.     middle=[RunnableLambda(multiply_ten), RunnableLambda(square)],
  14.     last=RunnableLambda(to_string),
  15. )
  16. result = sequence.invoke(5)
  17. # 5 -> add_ten -> 15 -> multiply_ten -> 150 -> square -> 22500 -> to_string -> "结果是: 22500"
复制代码
3.4 批量处理

3.4.1 RunnableEach:序列元素逐个处理

RunnableEach 对输入序列中的每个元素执行同一个 Runnable。
  1. from langchain_core.runnables import RunnableLambda, RunnableEach
  2. def add_ten(x: int) -> int:
  3.     return x + 10
  4. def multiply_ten(x: int) -> int:
  5.     return x * 10
  6. # 基本用法 - 对每个元素执行 add_ten
  7. each_add = RunnableEach(bound=RunnableLambda(add_ten))
  8. result = each_add.invoke([1, 2, 3])
  9. # [11, 12, 13]
  10. # 链式组合
  11. chain_each = (
  12.     RunnableEach(bound=RunnableLambda(add_ten))
  13.     | RunnableEach(bound=RunnableLambda(multiply_ten))
  14. )
  15. result = chain_each.invoke([1, 2, 3])
  16. # [1, 2, 3] -> add_ten -> [11, 12, 13] -> multiply_ten -> [110, 120, 130]
复制代码
3.4.2 batch_as_completed:完成即返回

batch_as_completed 会在任务完成时立即返回,而不是等待所有任务完成。
  1. from langchain_core.runnables import RunnableLambda
  2. def slow_op(x):
  3.     import time
  4.     time.sleep(x * 0.1)
  5.     return x * 10
  6. chain = RunnableLambda(slow_op)
  7. # 普通 batch - 按顺序返回
  8. results = chain.batch([1, 2, 3, 4])
  9. # batch_as_completed - 哪个完成返回哪个
  10. for result in chain.batch_as_completed([1, 2, 3, 4]):
  11.     print(result)  # 按完成顺序打印结果
复制代码
4. 进阶功能:让链更健壮

这一章是让 LCEL 链真正用于生产环境的关键。想象一下:你的链在凌晨三点因为网络波动失败了,你希望它自动重试吗?你希望优雅地降级到备用模型吗?这一章告诉你怎么做。
4.1 容错机制

4.1.1 with_fallbacks:备用方案

with_fallbacks 为 Runnable 添加备用方案。当主 Runnable 抛出异常时,自动切换到备用方案。
  1. from langchain_core.runnables import RunnableLambda
  2. def always_fail(x):
  3.     raise RuntimeError(f"处理 {x} 时出错")
  4. def fallback_handler(x):
  5.     return f"备用方案处理: {x}"
  6. def call_gpt4(x):
  7.     return f"GPT-4: {x}"
  8. def call_gpt35(x):
  9.     return f"GPT-3.5: {x}"
  10. def call_local(x):
  11.     return f"Local: {x}"
  12. # 基本用法 - 单个备用
  13. chain = RunnableLambda(always_fail).with_fallbacks(
  14.     fallbacks=[RunnableLambda(fallback_handler)]
  15. )
  16. result = chain.invoke(5)  # '备用方案处理: 5'
  17. # 模型降级示例 - 多级备用
  18. model_chain = (
  19.     RunnableLambda(call_gpt4)
  20.     .with_fallbacks(fallbacks=[RunnableLambda(call_gpt35)])
  21.     .with_fallbacks(fallbacks=[RunnableLambda(call_local)])
  22. )
  23. result = model_chain.invoke("Hello")
  24. # 依次尝试 GPT-4 -> GPT-3.5 -> Local
复制代码
4.1.2 with_retry:重试机制

with_retry 为 Runnable 添加重试机制,适用于处理网络请求临时故障等场景。
  1. from langchain_core.runnables import RunnableLambda
  2. def might_fail_once(x: int) -> int:
  3.     """模拟可能失败一次的函数"""
  4.     if not hasattr(might_fail_once, 'called'):
  5.         might_fail_once.called = True
  6.         print("失败一次")
  7.         raise RuntimeError("第一次调用失败")
  8.     return x + 10
  9. def selective_fail(x):
  10.     if x < 0:
  11.         raise ConnectionError("连接失败")
  12.     return x * 2
  13. # 基本用法
  14. retry_once = RunnableLambda(might_fail_once).with_retry()
  15. result = retry_once.invoke(5)  # 10
  16. # 指定重试次数和异常类型
  17. retry_connection = RunnableLambda(selective_fail).with_retry(
  18.     retry_if_exception_type=(ConnectionError,),
  19.     stop_after_attempt=3,
  20. )
  21. # 指数退避 + 随机抖动
  22. retry_with_jitter = RunnableLambda(might_fail_once).with_retry(
  23.     wait_exponential_jitter=True,
  24.     stop_after_attempt=5,
  25. )
  26. # 链式组合中的重试
  27. chain_with_retry = (
  28.     RunnableLambda(add_ten)
  29.     | RunnableLambda(might_fail_once).with_retry()
  30.     | RunnableLambda(multiply_ten)
  31. )
复制代码
参数详解

  • retry_if_exception_type:指定需要重试的异常类型
  • stop_after_attempt:最大重试次数
  • wait_exponential_jitter:指数退避+随机抖动,避免惊群效应
4.2 配置与监控

4.2.1 RunnableConfig:执行配置

RunnableConfig 用于控制 Runnable 的执行行为,可以传递给 invoke、batch、stream 等方法。
  1. from langchain_core.runnables import RunnableLambda, RunnableConfig
  2. def add_ten(x: int) -> int:
  3.     return x + 10
  4. chain = RunnableLambda(add_ten)
  5. # tags - 标签管理
  6. config_tags = {"tags": ["user", "premium", "urgent"]}
  7. result = chain.invoke(5, config=config_tags)
  8. # max_concurrency - 最大并发数
  9. config_concurrency = {"max_concurrency": 2}
  10. results = chain.batch([1, 2, 3, 4], config=config_concurrency)
  11. # with_config 方法 - 在链中设置配置
  12. chain_with_config = (
  13.     RunnableLambda(add_ten).with_config(run_name="add_ten_step")
  14.     | RunnableLambda(lambda x: x * 10).with_config(run_name="multiply_step")
  15. )
复制代码
RunnableConfig 常用参数
参数说明典型用法tags标签管理区分生产/开发/测试环境metadata元数据追踪请求来源、用户IDmax_concurrency最大并发数控制 API 调用的并发度recursion_limit递归深度限制防止无限循环callbacks回调函数集成监控、日志run_name运行名称方便调试时识别4.2.2 with_listeners:生命周期监听器

with_listeners 允许在 Runnable 执行的不同阶段(开始,结束,失败)插入自定义逻辑。
  1. from langchain_core.runnables import RunnableLambda
  2. def on_start_callback(run):
  3.     print(f"[on_start] 开始执行 run_id={run.id}")
  4. def on_end_callback(run):
  5.     print(f"[on_end] 执行完成 run_id={run.id}")
  6. def on_error_callback(run):
  7.     print(f"[on_error] 出错: run_id={run.id}")
  8. chain = RunnableLambda(add_ten).with_listeners(
  9.     on_start=on_start_callback,
  10.     on_end=on_end_callback,
  11.     on_error=on_error_callback,
  12. )
  13. result = chain.invoke(5)
  14. # 输出:
  15. # [on_start] 开始执行 run_id=...
  16. # [on_end] 执行完成 run_id=...
复制代码
5. Runnable 在 Agent 开发中的应用

这一章我们来聊聊 Runnable 在 LangChain Agent 中的应用。Agent 是 LangChain 最强大的特性之一,而它背后的核心正是 Runnable 接口。
5.1 Agent 与 Runnable 的关系

想象一下 Agent 的工作流程:感知用户输入 → 决策下一步做什么 → 执行动作 → 获取结果 → 继续决策...
这个流程中的每一步,都可以用 Runnable 来表示:
  1. # Agent 的简化工作流
  2. agent_chain = (
  3.     prompt                           # 1. 感知:解析用户输入
  4.     | llm.with_tools(tools)          # 2. 决策:选择要执行的工具
  5.     | output_parser                 # 3. 解析:解析工具调用
  6.     | execute_tools                 # 4. 执行:运行选择的工具
  7.     | ...  # 循环往复
  8. )
复制代码
本质上,Agent 就是一条复杂的 Runnable 链。它之所以能灵活地处理各种任务,正是因为它建立在 Runnable 的组合能力之上。
5.2 动态路由在 Agent 中的应用

在 Agent 中,最常见的场景是根据条件决定下一步操作。
5.2.1 工具选择的条件判断
  1. from langchain_core.runnables import RunnableBranch, RunnableLambda
  2. # 根据用户意图选择不同工具
  3. def classify_intent(x):
  4.     return x.get("intent", "unknown")
  5. intent_router = RunnableBranch(
  6.     (lambda x: x.get("intent") == "weather",
  7.      RunnableLambda(lambda x: "调用天气API")),
  8.     (lambda x: x.get("intent") == "search",
  9.      RunnableLambda(lambda x: "调用搜索API")),
  10.     (lambda x: x.get("intent") == "calculator",
  11.      RunnableLambda(lambda x: "调用计算器")),
  12.     RunnableLambda(lambda x: "调用通用对话模型"),
  13. )
复制代码
5.2.2 多模型切换
  1. # 根据任务类型选择不同模型
  2. def select_model(x):
  3.     task_type = x.get("task_type", "general")
  4.     if task_type == "code":
  5.         return "gpt-4"
  6.     elif task_type == "creative":
  7.         return "claude-3"
  8.     return "gpt-3.5-turbo"
  9. model_router = RunnableBranch(
  10.     (lambda x: x.get("task_type") == "code",
  11.      llm.bind(model="gpt-4")),
  12.     (lambda x: x.get("task_type") == "creative",
  13.      llm.bind(model="claude-3")),
  14.     llm.bind(model="gpt-3.5-turbo"),
  15. )
复制代码
5.3 工具调用的容错处理

Agent 免不了要和外部工具打交道,而外部工具可能会失败。Runnable 的容错机制在这里大显身手。
5.3.1 工具失败时的备用方案
  1. from langchain_core.runnables import RunnableLambda
  2. def call_primary_api(x):
  3.     # 可能失败的 API 调用
  4.     raise ConnectionError("主API不可用")
  5. def call_backup_api(x):
  6.     return f"备用API返回: {x}"
  7. # 使用 with_fallbacks 处理工具失败
  8. safe_tool_chain = (
  9.     RunnableLambda(call_primary_api)
  10.     .with_fallbacks(fallbacks=[RunnableLambda(call_backup_api)])
  11. )
  12. result = safe_tool_chain.invoke("test")
  13. # 输出:备用API返回: test
复制代码
5.3.2 网络/API 临时故障重试
  1. def call_unreliable_api(x):
  2.     import random
  3.     if random.random() < 0.3:
  4.         raise ConnectionError("网络波动")
  5.     return f"成功: {x}"
  6. # 使用 with_retry 处理临时故障
  7. retryable_tool = (
  8.     RunnableLambda(call_unreliable_api)
  9.     .with_retry(
  10.         retry_if_exception_type=(ConnectionError,),
  11.         stop_after_attempt=3,
  12.         wait_exponential_jitter=True,
  13.     )
  14. )
复制代码
6. 实战案例

让我们通过几个实际案例,把学到的知识串联起来。
6.1 多语言翻译助手

这个案例展示如何使用 .batch() 批量处理多个翻译请求。
  1. from langchain_core.prompts import ChatPromptTemplate
  2. from langchain_openai import ChatOpenAI
  3. from langchain_core.output_parsers import StrOutputParser
  4. # 初始化模型
  5. base_url = "https://api.minimax.chat/v1"
  6. api_key = "your-api-key"
  7. model = ChatOpenAI(
  8.     model="MiniMax-M2.7",
  9.     base_url=base_url,
  10.     api_key=api_key,
  11. )
  12. # 创建翻译链
  13. translation_prompt = ChatPromptTemplate.from_template(
  14.     "将以下句子翻译成{language}:{text}"
  15. )
  16. output_parser = StrOutputParser()
  17. translation_chain = translation_prompt | model | output_parser
  18. # 批量翻译
  19. batch_inputs = [
  20.     {"language": "中文", "text": "Hello, how are you?"},
  21.     {"language": "日语", "text": "Hello, how are you?"},
  22.     {"language": "法语", "text": "Hello, how are you?"},
  23.     {"language": "德语", "text": "Hello, how are you?"},
  24. ]
  25. results = translation_chain.batch(batch_inputs)
  26. # 返回: ["你好,你怎么样?", "こんにちは、お元気ですか?", "Bonjour", "Wie geht es dir?"]
复制代码
6.2 多功能消息处理系统

这个案例展示如何使用 RunnableParallel 同时执行多个任务。
  1. from langchain_core.runnables import RunnableParallel
  2. from langchain_core.prompts import ChatPromptTemplate
  3. from langchain_openai import ChatOpenAI
  4. from langchain_core.output_parsers import StrOutputParser
  5. # 初始化模型
  6. model = ChatOpenAI(model="MiniMax-M2.7", base_url=base_url, api_key=api_key)
  7. output_parser = StrOutputParser()
  8. # 创建多任务链
  9. multi_task_chain = RunnableParallel({
  10.     "translation": (
  11.         ChatPromptTemplate.from_template("翻译成中文:{text}")
  12.         | model
  13.         | StrOutputParser()
  14.     ),
  15.     "summary": (
  16.         ChatPromptTemplate.from_template("用一句话总结:{text}")
  17.         | model
  18.         | StrOutputParser()
  19.     ),
  20.     "sentiment": (
  21.         ChatPromptTemplate.from_template("分析情感(正面/负面/中性):{text}")
  22.         | model
  23.         | StrOutputParser()
  24.     ),
  25. })
  26. # 一次调用,同时获得翻译、总结和情感分析
  27. result = multi_task_chain.invoke({
  28.     "text": "LangChain is an amazing framework for building LLM applications!"
  29. })
  30. # 返回:
  31. # {
  32. #     "translation": "LangChain 是一个用于构建 LLM 应用的惊人框架!",
  33. #     "summary": "介绍 LangChain 框架的强大功能",
  34. #     "sentiment": "正面"
  35. # }
复制代码
6.3 带验证的 AI 对话链

这个案例展示如何使用 RunnableLambda 添加输入验证。
  1. from langchain_core.runnables import RunnableLambda
  2. from langchain_core.prompts import ChatPromptTemplate
  3. from langchain_openai import ChatOpenAI
  4. from langchain_core.output_parsers import StrOutputParser
  5. model = ChatOpenAI(model="MiniMax-M2.7", base_url=base_url, api_key=api_key)
  6. def validate_input(input_dict):
  7.     if not input_dict.get("topic"):
  8.         raise ValueError("topic 不能为空")
  9.     if len(input_dict["topic"]) > 100:
  10.         raise ValueError("topic 长度不能超过100")
  11.     return input_dict
  12. validation_chain = (
  13.     RunnableLambda(validate_input)
  14.     | ChatPromptTemplate.from_template("写一首关于{topic}的诗")
  15.     | model
  16.     | StrOutputParser()
  17. )
  18. # 正常调用
  19. result = validation_chain.invoke({"topic": "春天"})
  20. # 验证失败会抛出 ValueError
  21. # validation_chain.invoke({"topic": ""})  # ValueError: topic 不能为空
复制代码
7. 总结

7.1 Langchain中内置Runnable 类型

核心 Runnable 类

类/方法功能RunnableLambda将普通函数转为 RunnableRunnableParallel并行执行多个 RunnableRunnablePassthrough透传输入或选择字段RunnableBranch条件分支与动态路由RouterRunnable基于 key 的简单路由RunnableSequence显式创建有序序列RunnableEach对序列中每个元素执行 Runnable.with_fallbacks()添加容错备用方案.with_retry()添加重试机制.with_listeners()添加生命周期监听器.with_config()设置执行配置.pick()从字典中选择字段.assign()向字典添加字段提示模板与输出解析器

类型说明导入方式ChatPromptTemplate聊天提示模板from langchain_core.prompts import ChatPromptTemplatePromptTemplate文本提示模板from langchain_core.prompts import PromptTemplateStrOutputParser字符串输出解析器from langchain_core.output_parsers import StrOutputParserJsonOutputParserJSON 输出解析器from langchain_core.output_parsers import JsonOutputParserXMLOutputParserXML 输出解析器from langchain_core.output_parsers import XMLOutputParserLLM 模型

类型说明导入方式ChatOpenAIOpenAI 聊天模型from langchain_openai import ChatOpenAIOpenAIOpenAI 文本模型from langchain_openai import OpenAIChatAnthropicAnthropic 聊天模型from langchain_anthropic import ChatAnthropicChatVertexAIGoogle Vertex AI 模型from langchain_google_vertexai import ChatVertexAI<blockquote>

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册