前言
在实际业务中,根据 tracking_id 追溯一条请求的完整处理路径是比较常见的需求。借助 Flask 自带的全局对象 g 以及钩子函数,可以很容易地为每条请求添加 tracking_id,并在日志中自动记录。
主要内容:
- 如何为每条请求添加 tracking_id
- 如何为日志自动添加 tracking_id 记录
- 如何自定义响应类,实现统一的响应格式,并在响应头中添加 tracking_id
- 视图函数单元测试示例
- Gunicorn 配置
项目结构
虽然内容看起来很多,但 tracking_id 的实现其实很简单。本文按照生产项目的规范组织了代码,添加了 Gunicorn 配置和单元测试代码,以及规范了日志格式和 JSON 响应格式。
- ├── apis
- │ ├── common
- │ │ ├── common.py
- │ │ └── __init__.py
- │ └── __init__.py
- ├── gunicorn.conf.py
- ├── handles
- │ └── user.py
- ├── logs
- │ ├── access.log
- │ └── error.log
- ├── main.py
- ├── middlewares
- │ ├── __init__.py
- │ └── tracking_id.py
- ├── pkgs
- │ └── log
- │ ├── app_log.py
- │ └── __init__.py
- ├── pyproject.toml
- ├── pytest.ini
- ├── README.md
- ├── responses
- │ ├── __init__.py
- │ └── json_response.py
- ├── tests
- │ └── apis
- │ └── test_common.py
- ├── tmp
- │ └── gunicorn.pid
- └── uv.lock
复制代码 安装依赖
- uv add flask
- uv add gunicorn gevent # 生产环境部署一般依赖这两个
- uv add --dev pytest # 测试库
复制代码 实现添加 tracking_id 的中间件
代码文件:middlewares/tracking_id.py
- from uuid import uuid4
- from flask import Flask, Response, g, request
- def tracking_id_middleware(app: Flask):
- """
- 跟踪 ID 中间件
- 为每个请求生成或获取跟踪 ID,用于追踪请求链路
- """
-
- @app.before_request
- def tracking_id_before_request():
- """
- 请求前处理函数
- 检查请求头中是否包含 X-Tracking-ID,如果没有则生成一个新的 UUID 作为跟踪 ID
- 并将其存储到 Flask 的全局对象 g 中,供后续处理使用
- """
- # 从请求头中获取 X-Tracking-ID
- tracking_id = request.headers.get("X-Tracking-ID")
- if not tracking_id:
- # 如果请求头中没有 X-Tracking-ID,则生成一个新的 UUID
- tracking_id = str(uuid4())
- # 将跟踪 ID 存储到 Flask 的全局对象 g 中,供后续处理使用
- g.tracking_id = tracking_id
- @app.after_request
- def tracking_id_after_request(response: Response):
- """
- 请求后处理函数
- 将跟踪 ID 添加到响应头中,以便客户端知道本次请求的跟踪 ID
- """
- # 检查响应头中是否已经有 X-Tracking-ID
- tracking_id = response.headers.get("X-Tracking-ID", "")
- if not tracking_id:
- # 如果响应头中没有 X-Tracking-ID,则从全局对象 g 中获取
- tracking_id = g.get("tracking_id", "")
- # 将跟踪 ID 添加到响应头中
- response.headers["X-Tracking-ID"] = tracking_id
- return response
- # 返回应用实例
- return app
复制代码 代码文件 middlewares/__init__.py,方便其他模块导入
- from .tracking_id import tracking_id_middleware
- __all__ = [
- "tracking_id_middleware",
- ]
复制代码 日志模块 - 自动记录 tracking_id
实现一个简单的输出到控制台的日志模块,日志格式为 JSON,自动添加 tracking_id 到日志中,避免手动在 logger.info() 这类方法中传入 tracking_id。
代码文件 pkgs/log/app_log.py
- import json
- import logging
- import sys
- from flask import g
- class JSONFormatter(logging.Formatter):
- """日志格式化器,输出 JSON 格式的日志。"""
- def format(self, record: logging.LogRecord) -> str:
- log_record = {
- "@timestamp": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"),
- "level": record.levelname,
- "name": record.name,
- # "processName": record.processName, # 如需记录进程名可取消注释
- "tracking_id": getattr(record, "tracking_id", None),
- "loc": "%s:%d" % (record.filename, record.lineno),
- "func": record.funcName,
- "message": record.getMessage(),
- }
- return json.dumps(log_record, ensure_ascii=False, default=str)
- class TrackingIDFilter(logging.Filter):
- """日志过滤器,为日志记录添加 tracking_id。"""
- def filter(self, record):
- record.tracking_id = g.get("tracking_id", None)
- return True
- def _setup_console_handler(level: int) -> logging.StreamHandler:
- """设置控制台日志处理器。
- Args:
- level (int): 日志级别。
- """
- handler = logging.StreamHandler(sys.stdout)
- handler.setLevel(level)
- handler.setFormatter(JSONFormatter())
- return handler
- def setup_app_logger(level: int = logging.INFO, name: str = "app") -> logging.Logger:
- logger = logging.getLogger(name)
- if logger.hasHandlers():
- return logger
- logger.setLevel(level)
- logger.propagate = False
- logger.addHandler(_setup_console_handler(level))
- logger.addFilter(TrackingIDFilter())
- return logger
复制代码 在 pkgs/log/__init__.py 中初始化 logger,实现单例调用。
- from .app_log import setup_app_logger
- logger = setup_app_logger()
- __all__ = ["logger"]
复制代码 自定义响应类
规范 JSON 类型的响应格式,并在响应头中添加 X-Tracking-ID 和 X-DateTime。
代码文件 responses/json_response.py
- import json
- from datetime import datetime
- from http import HTTPStatus
- from typing import Any
- from flask import Response, g, request
- class JsonResponse(Response):
- def __init__(
- self,
- data: Any = None,
- code: HTTPStatus = HTTPStatus.OK,
- msg: str = "this is a json response",
- ):
- x_tracking_id = g.get("tracking_id", "")
- x_datetime = datetime.now().astimezone().isoformat(timespec="seconds")
- resp_headers = {
- "Content-Type": "application/json",
- "X-Tracking-ID": x_tracking_id,
- "X-DateTime": x_datetime,
- }
- try:
- resp = json.dumps(
- {
- "code": code.value,
- "msg": msg,
- "data": data,
- },
- ensure_ascii=False,
- default=str,
- )
- except Exception as e:
- resp = json.dumps(
- {
- "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
- "msg": f"Response serialization error: {str(e)}",
- "data": None,
- }
- )
- super().__init__(response=resp, status=code.value, headers=resp_headers)
- class Success(JsonResponse):
- def __init__(self, data: Any = None, msg: str = ""):
- if not msg:
- msg = f"{request.method} {request.path} success"
- super().__init__(data=data, code=HTTPStatus.OK, msg=msg)
- class Fail(JsonResponse):
- def __init__(self, msg: str = "", data: Any = None):
- if not msg:
- msg = f"{request.method} {request.path} failed"
- super().__init__(data=data, code=HTTPStatus.INTERNAL_SERVER_ERROR, msg=msg)
- class ArgumentNotFound(JsonResponse):
- def __init__(self, msg: str = "", data: Any = None):
- if not msg:
- msg = f"{request.method} {request.path} argument not found"
- super().__init__(data=data, code=HTTPStatus.BAD_REQUEST, msg=msg)
- class ArgumentInvalid(JsonResponse):
- def __init__(self, msg: str = "", data: Any = None):
- if not msg:
- msg = f"{request.method} {request.path} argument invalid"
- super().__init__(data=data, code=HTTPStatus.BAD_REQUEST, msg=msg)
- class AuthFailed(JsonResponse):
- """HTTP 状态码: 401"""
- def __init__(self, msg: str = "", data: Any = None):
- if not msg:
- msg = f"{request.method} {request.path} auth failed"
- super().__init__(data=data, code=HTTPStatus.UNAUTHORIZED, msg=msg)
- class ResourceConflict(JsonResponse):
- """HTTP 状态码: 409"""
- def __init__(self, msg: str = "", data: Any = None):
- if not msg:
- msg = f"{request.method} {request.path} resource conflict"
- super().__init__(data=data, code=HTTPStatus.CONFLICT, msg=msg)
- class ResourceNotFound(JsonResponse):
- """HTTP 状态码: 404"""
- def __init__(self, msg: str = "", data: Any = None):
- if not msg:
- msg = f"{request.method} {request.path} resource not found"
- super().__init__(data=data, code=HTTPStatus.NOT_FOUND, msg=msg)
- class ResourceForbidden(JsonResponse):
- """HTTP 状态码: 403"""
- def __init__(self, msg: str = "", data: Any = None):
- if not msg:
- msg = f"{request.method} {request.path} resource forbidden"
- super().__init__(data=data, code=HTTPStatus.FORBIDDEN, msg=msg)
复制代码 代码文件 responses/__init__.py,方便其他模块调用。
- from .json_response import (
- ArgumentInvalid,
- ArgumentNotFound,
- AuthFailed,
- Fail,
- JsonResponse,
- ResourceConflict,
- ResourceForbidden,
- ResourceNotFound,
- Success,
- )
- __all__ = [
- "JsonResponse",
- "Success",
- "Fail",
- "ArgumentNotFound",
- "ArgumentInvalid",
- "AuthFailed",
- "ResourceConflict",
- "ResourceNotFound",
- "ResourceForbidden",
- ]
复制代码 编写视图函数
代码文件 apis/common/common.py。以下定义了 5 个路由,主要用于测试响应类是否正常返回 JSON 格式。
- from datetime import datetime
- from flask import Blueprint
- from handles import user as user_handle
- from pkgs.log import logger
- from responses import Success
- route = Blueprint("common_apis", __name__, url_prefix="/api")
- @route.get("/health")
- def health_check():
- # print(g.get("tracking_id", "no-tracking-id"))
- logger.info("Health check")
- return Success(data="OK")
- @route.get("/users")
- def get_users():
- users = user_handle.get_users()
- return Success(data=users)
- @route.get("/names")
- def get_names():
- names = ["Alice", "Bob", "Charlie"]
- return Success(data=names)
- @route.get("/item")
- def get_item():
- item = {"id": 101, "name": "Sample Item", "price": 29.99, "now": datetime.now()}
- return Success(data=item)
- @route.get("/error")
- def get_error():
- raise Exception("This is a test exception")
复制代码 GET /api/users 调用了 handles/ 中的代码,模拟查询数据库。handles/user.py 中的代码如下:
- import time
- from typing import Any, Dict, List
- def get_users() -> List[Dict[str, Any]]:
- # 模拟查询用户数据
- time.sleep(0.1) # 模拟延迟
- users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
- return users
复制代码 代码文件 apis/common/__init__.py 中导入各个蓝图并统一暴露。由于示例代码只定义了一个蓝图,所以这里写得很简单。如果有多个蓝图,可以把蓝图都添加到一个列表中,在 Flask 应用中一次性遍历注册。
- from .common import route
- # from .common import route as common_route
- # routes = [
- # common_route,
- # ]
- __all__ = ["route"]
复制代码 代码文件 apis/__init__.py 中提供 Flask 应用的工厂函数。
- import traceback
- from flask import Flask
- from apis.common import route as common_route
- from middlewares import tracking_id_middleware
- from responses import Fail, ResourceNotFound
- from pkgs.log import logger
- # 错误处理器
- def error_handler_notfound(error):
- return ResourceNotFound()
- def error_handler_generic(error):
- logger.error(traceback.format_exc())
- return Fail(data=str(error))
- def create_app() -> Flask:
- app = Flask(__name__)
- # 注册中间件
- app = tracking_id_middleware(app)
- # 注册错误处理器
- app.errorhandler(Exception)(error_handler_generic)
- app.errorhandler(404)(error_handler_notfound)
- # 注册蓝图
- app.register_blueprint(common_route)
- return app
- __all__ = [
- "create_app",
- ]
复制代码 入口代码文件 main.py
- from apis import create_app
- app = create_app()
- if __name__ == "__main__":
- app.run(host="127.0.0.1", port=8000, debug=False)
复制代码 简单运行测试
- # 方式1, 直接启动, 用于简单测试
- python main.py
- # 方式2, 使用 gunicorn, 这是生产环境启动方式. 配置文件默认路径即 ./gunicorn.conf.py
- gunicorn main:app
复制代码
- curl 请求 /api/health。可以看到响应头中已经有了 X-Tracking-ID 和 X-DateTime
- $ curl -v http://127.0.0.1:8000/api/health
- * Trying 127.0.0.1:8000...
- * Connected to 127.0.0.1 (127.0.0.1) port 8000
- * using HTTP/1.x
- > GET /api/health HTTP/1.1
- > Host: 127.0.0.1:8000
- > User-Agent: curl/8.14.1
- > Accept: */*
- >
- * Request completely sent off
- < HTTP/1.1 200 OK
- < Server: gunicorn
- < Date: Sat, 17 Jan 2026 08:41:07 GMT
- < Connection: keep-alive
- < Content-Type: application/json
- < X-Tracking-ID: 1f0adb8d-9bee-49d4-873f-31aa1437da60
- < X-DateTime: 2026-01-17T16:41:07+08:00
- < Content-Length: 61
- <
- * Connection #0 to host 127.0.0.1 left intact
- {"code": 200, "msg": "GET /api/health success", "data": "OK"}
复制代码
- curl 请求 /api/users。手动指定请求头中的 X-Tracking-ID,响应时也会保持相同的 ID。
- $ curl -v http://127.0.0.1:8000/api/users -H 'X-Tracking-ID:123456'
- * Trying 127.0.0.1:8000...
- * Connected to 127.0.0.1 (127.0.0.1) port 8000
- * using HTTP/1.x
- > GET /api/users HTTP/1.1
- > Host: 127.0.0.1:8000
- > User-Agent: curl/8.14.1
- > Accept: */*
- > X-Tracking-ID:123456
- >
- * Request completely sent off
- < HTTP/1.1 200 OK
- < Server: gunicorn
- < Date: Sat, 17 Jan 2026 08:44:37 GMT
- < Connection: keep-alive
- < Content-Type: application/json
- < X-Tracking-ID: 123456
- < X-DateTime: 2026-01-17T16:44:37+08:00
- < Content-Length: 110
- <
- * Connection #0 to host 127.0.0.1 left intact
- {"code": 200, "msg": "GET /api/users success", "data": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}
复制代码 编写单元测试
使用 pytest 进行单元测试,这里只是一个简单的示例
配置 pytest
配置文件 pytest.ini
- [pytest]
- testpaths = "tests"
- pythonpath = "."
复制代码 测试代码
代码文件 tests/apis/test_common.py
- from typing import Generator
- from unittest.mock import MagicMock, patch
- import pytest
- from flask import Flask
- from flask.testing import FlaskClient
- from apis.common import route as common_route
- @pytest.fixture
- def app() -> Generator[Flask, None, None]:
- app = Flask(__name__)
- app.config.update(
- {
- "TESTING": True,
- "DEBUG": False,
- }
- )
- app.register_blueprint(common_route)
- yield app
- @pytest.fixture
- def client(app: Flask) -> FlaskClient:
- return app.test_client()
- class TestGetHealth:
- def test_get_health_success(self, client: FlaskClient) -> None:
- resp = client.get("/api/health")
- assert resp.status_code == 200
- resp_headers = resp.headers
- assert resp_headers.get("Content-Type") == "application/json"
- assert "X-Tracking-ID" in resp_headers
- assert "X-DateTime" in resp_headers
- resp_body = resp.json
- assert resp_body == {
- "code": 200,
- "msg": "GET /api/health success",
- "data": "OK",
- }
- class TestGetUsers:
- @patch("apis.common.common.user_handle.get_users")
- def test_get_users(self, mock_get_users: MagicMock, client: FlaskClient) -> None:
- # mock user.get_users() 的返回值
- mock_get_users.return_value = [
- {"id": 1, "name": "Alice123"},
- {"id": 2, "name": "Bob456"},
- ]
- # 发送请求
- resp = client.get("/api/users")
- assert resp.status_code == 200
- resp_headers = resp.headers
- assert resp_headers.get("Content-Type") == "application/json"
- assert "X-Tracking-ID" in resp_headers
- assert "X-DateTime" in resp_headers
- # resp_body = resp.json
- mock_get_users.assert_called_once()
复制代码 执行测试
配置 Gunicorn
代码文件 gunicorn.conf.py。简单配置了一些启动参数,以及请求日志的格式。
- # Gunicorn 配置文件
- from pathlib import Path
- from multiprocessing import cpu_count
- import gunicorn.glogging
- from datetime import datetime
- class CustomLogger(gunicorn.glogging.Logger):
- def atoms(self, resp, req, environ, request_time):
- """
- 重写 atoms 方法来自定义日志占位符
- """
- # 获取默认的所有占位符数据
- atoms = super().atoms(resp, req, environ, request_time)
-
- # 自定义 't' (时间戳) 的格式
- now = datetime.now().astimezone()
- atoms['t'] = now.isoformat(timespec="seconds")
-
- return atoms
-
- # 预加载应用代码
- preload_app = True
- # 工作进程数量:通常是 CPU 核心数的 2 倍加 1
- # workers = int(cpu_count() * 2 + 1)
- workers = 2
- # 使用 gevent 异步 worker 类型,适合 I/O 密集型应用
- # 注意:gevent worker 不使用 threads 参数,而是使用协程进行并发处理
- worker_class = "gevent"
- # 每个 gevent worker 可处理的最大并发连接数
- worker_connections = 2000
- # 绑定地址和端口
- bind = "127.0.0.1:8000"
- # 进程名称
- proc_name = "flask-dev"
- # PID 文件路径
- pidfile = str(Path(__file__).parent / "tmp" / "gunicorn.pid")
- logger_class = CustomLogger
- access_log_format = (
- '{"@timestamp": "%(t)s", '
- '"remote_addr": "%(h)s", '
- '"protocol": "%(H)s", '
- '"host": "%({host}i)s", '
- '"request_method": "%(m)s", '
- '"request_path": "%(U)s", '
- '"status_code": %(s)s, '
- '"response_length": %(b)s, '
- '"referer": "%(f)s", '
- '"user_agent": "%(a)s", '
- '"x_tracking_id": "%({x-tracking-id}i)s", '
- '"request_time": %(L)s}'
- )
- # 访问日志路径
- accesslog = str(Path(__file__).parent / "logs" / "access.log")
- # 错误日志路径
- errorlog = str(Path(__file__).parent / "logs" / "error.log")
- # 日志级别
- loglevel = "debug"
复制代码 输出的日志格式。可以看到日志格式符合 JSON 规范,便于 Filebeat 收集后在 Kibana 上检索。
- $ tail -n 1 logs/access.log | python3 -m json.tool
- {
- "@timestamp": "2026-01-17T16:44:37+08:00",
- "remote_addr": "127.0.0.1",
- "protocol": "HTTP/1.1",
- "host": "127.0.0.1:8000",
- "request_method": "GET",
- "request_path": "/api/users",
- "status_code": 200,
- "response_length": 110,
- "referer": "-",
- "user_agent": "curl/8.14.1",
- "x_tracking_id": "123456",
- "request_time": 0.102042
- }
复制代码 补充
全局对象 g 的注意事项
- g 不是进程或线程共享的全局变量,请只在请求处理流程中使用 g。
- 如果视图函数中启动了后台线程或异步任务,在子线程中直接访问 g 通常会报错或获取不到数据。这时建议显式传递数据。
- 不要在 g 中存储大文件或数据对象,否则会占用过高内存。
- g 不是 session。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |