找回密码
 立即注册
首页 业界区 业界 FastAPI异步方法中调用同步方法

FastAPI异步方法中调用同步方法

筒霓暄 前天 16:15
前言

在异步方法中调用同步方法,会直接阻塞整个事件循环,导致应用在执行同步方法期间无法处理其他任何并发请求,从而拖垮整个服务的性能。
为了解决这个问题,核心思路是将同步方法交给外部线程池去执行。
方法1, 使用 to_thread

Python 3.9 后可以使用 asyncio.to_thread 方法,将同步函数跑在独立的线程中,并返回一个协程供 await
  1. import asyncio
  2. import time
  3. from fastapi import FastAPI
  4. app = FastAPI()
  5. def sync_task(name: str):
  6.     time.sleep(2)
  7.     return f"Hello {name}, sync task done!"
  8. @app.get("/async-call")
  9. async def async_endpoint():
  10.     result = await asyncio.to_thread(sync_task, "World")
  11.    
  12.     return {"message": result}
复制代码
方法2, 直接定义同步路由

FastAPI支持定义同步路由,FastAPI会自动在一个外部线程池中运行该函数。不过出于代码整体设计的考虑,个人不建议这么做。
方法3, 使用 run_in_threadpool

FastAPI 基于 Starlette, 而 Starlette 提供一个工具函数 run_in_threadpool,这种方式类似于 asyncio.to_thread,在某些老版本的 FastAPI 或特定的 contextvars 传递场景下更常用。
  1. from fastapi.concurrency import run_in_threadpool
  2. @app.get("/method3")
  3. async def starlette_endpoint():
  4.     result = await run_in_threadpool(sync_task, "Starlette")
  5.     return {"message": result}
复制代码
方法4, 使用进程池

对于CPU密集型任务,应该使用多进程ProcessPoolExecutor来操作
  1. import concurrent.futures
  2. import math
  3. from fastapi import FastAPI
  4. app = FastAPI()
  5. # 创建一个全局进程池
  6. executor = concurrent.futures.ProcessPoolExecutor()
  7. def cpu_intensive_calculation(n: int):
  8.     # 模拟重度 CPU 计算
  9.     return sum(math.isqrt(i) for i in range(n))
  10. @app.get("/cpu-bound-task")
  11. async def cpu_task():
  12.     loop = asyncio.get_running_loop()
  13.     result = await loop.run_in_executor(executor, cpu_intensive_calculation, 10**7)
  14.     return {"result": result}
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册