← Назад к вопросам

Как отслеживать trace ID между сервисами?

1.6 Junior🔥 81 комментариев
#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Распределённое трассирование (Distributed Tracing) с Trace ID

Trace ID — это уникальный идентификатор, который следует за запросом через все микросервисы. Это критически важно для отладки и мониторинга в микросервисной архитектуре.

1. Генерация и передача Trace ID

Создание Trace ID во входящем запросе

import uuid
from fastapi import FastAPI, Request
from fastapi.middleware.base import BaseHTTPMiddleware
import logging
from contextvars import ContextVar

app = FastAPI()

# Контекстная переменная для хранения trace_id
trace_id_context: ContextVar[str] = ContextVar('trace_id', default=None)

# Middleware для извлечения или генерации trace_id
class TraceIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Проверяем, есть ли trace_id в заголовках
        trace_id = request.headers.get("x-trace-id")
        
        # Если нет — генерируем новый
        if not trace_id:
            trace_id = str(uuid.uuid4())
        
        # Сохраняем в контекстную переменную
        token = trace_id_context.set(trace_id)
        
        try:
            response = await call_next(request)
            # Добавляем trace_id в ответ
            response.headers["x-trace-id"] = trace_id
            return response
        finally:
            trace_id_context.reset(token)

app.add_middleware(TraceIDMiddleware)

2. Логирование с Trace ID

Настройка логгера

import logging
import json
from pythonjsonlogger import jsonlogger

# Получаем текущий trace_id
def get_trace_id() -> str:
    return trace_id_context.get() or "unknown"

# Форматтер для JSON логов
class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_record):
        super().add_fields(log_record, record, message_record)
        # Добавляем trace_id в каждый лог
        log_record['trace_id'] = get_trace_id()
        log_record['service'] = 'user-service'

# Настраиваем логгер
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
formatter = CustomJsonFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)

# Использование
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    logger.info(f"Fetching user {user_id}")  # trace_id добавится автоматически
    return {"id": user_id, "name": "John"}

3. Передача Trace ID между микросервисами

HTTP клиент с автоматической передачей trace_id

import httpx
from typing import Optional

class TracingHTTPClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient(base_url=base_url)
    
    async def get(self, url: str, **kwargs) -> dict:
        trace_id = get_trace_id()
        headers = kwargs.get('headers', {})
        headers['x-trace-id'] = trace_id
        kwargs['headers'] = headers
        
        logger.info(f"Making GET request to {url}")
        response = await self.client.get(url, **kwargs)
        return response.json()
    
    async def post(self, url: str, data: dict = None, **kwargs) -> dict:
        trace_id = get_trace_id()
        headers = kwargs.get('headers', {})
        headers['x-trace-id'] = trace_id
        kwargs['headers'] = headers
        
        logger.info(f"Making POST request to {url}")
        response = await self.client.post(url, json=data, **kwargs)
        return response.json()
    
    async def close(self):
        await self.client.aclose()

# Использование
user_service = TracingHTTPClient("http://user-service:8001")
order_service = TracingHTTPClient("http://order-service:8002")

@app.get("/orders/{order_id}")
async def get_order(order_id: int):
    # Получаем данные пользователя из другого сервиса
    # trace_id автоматически передаётся
    order = {"id": order_id, "user_id": 123, "total": 99.99}
    user = await user_service.get(f"/users/123")
    
    logger.info(f"Order fetched with user data")
    return {"order": order, "user": user}

4. Генерирование Span ID для отдельных операций

import uuid
from contextvars import ContextVar

span_id_context: ContextVar[str] = ContextVar('span_id', default=None)
parent_span_id_context: ContextVar[Optional[str]] = ContextVar('parent_span_id', default=None)

def start_span(operation_name: str) -> str:
    """Начинает новый span для операции."""
    span_id = str(uuid.uuid4())
    parent_span_id = span_id_context.get()
    
    # Сохраняем в контекст
    token = span_id_context.set(span_id)
    parent_token = parent_span_id_context.set(parent_span_id)
    
    logger.info(f"Starting span: {operation_name}", extra={
        "span_id": span_id,
        "parent_span_id": parent_span_id
    })
    
    return span_id

def end_span(span_id: str):
    """Заканчивает span."""
    logger.info(f"Ending span: {span_id}", extra={"span_id": span_id})

# Использование
@app.post("/orders")
async def create_order(order_data: dict):
    span_id = start_span("create_order")
    
    try:
        # Валидация
        validate_span = start_span("validate_order")
        # ... валидируем ...
        end_span(validate_span)
        
        # Сохранение в БД
        save_span = start_span("save_to_database")
        # ... сохраняем ...
        end_span(save_span)
        
        logger.info("Order created successfully")
        return {"id": 123, "status": "created"}
    finally:
        end_span(span_id)

5. Интеграция с OpenTelemetry

Профессиональное решение для enterprise:

from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

# Инициализация Jaeger экспортера
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger-agent",
    agent_port=6831,
)

trace_provider = TracerProvider()
trace_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(trace_provider)

# Автоматическое инструментирование
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()

# Получение трейсера
tracer = trace.get_tracer(__name__)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    with tracer.start_as_current_span("get_user") as span:
        span.set_attribute("user_id", user_id)
        
        # Запрос к БД автоматически добавится как child span
        user = db.query(User).filter(User.id == user_id).first()
        
        span.set_attribute("user.name", user.name)
        return {"id": user.id, "name": user.name}

6. Корреляционные ID для разных типов событий

from enum import Enum
from contextvars import ContextVar

class ContextType(str, Enum):
    TRACE_ID = "trace_id"      # Запрос через все сервисы
    REQUEST_ID = "request_id"  # Уникален для одного запроса
    TENANT_ID = "tenant_id"    # Мультитенант приложение
    USER_ID = "user_id"        # Идентификатор пользователя

context_vars = {
    ContextType.TRACE_ID: ContextVar('trace_id'),
    ContextType.REQUEST_ID: ContextVar('request_id'),
    ContextType.TENANT_ID: ContextVar('tenant_id'),
    ContextType.USER_ID: ContextVar('user_id'),
}

class ContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Извлекаем из заголовков
        trace_id = request.headers.get("x-trace-id") or str(uuid.uuid4())
        request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
        tenant_id = request.headers.get("x-tenant-id")
        user_id = request.headers.get("x-user-id")
        
        # Сохраняем в контекстные переменные
        tokens = []
        tokens.append(context_vars[ContextType.TRACE_ID].set(trace_id))
        tokens.append(context_vars[ContextType.REQUEST_ID].set(request_id))
        if tenant_id:
            tokens.append(context_vars[ContextType.TENANT_ID].set(tenant_id))
        if user_id:
            tokens.append(context_vars[ContextType.USER_ID].set(user_id))
        
        try:
            response = await call_next(request)
            response.headers["x-trace-id"] = trace_id
            response.headers["x-request-id"] = request_id
            return response
        finally:
            for token in tokens:
                context_vars[ContextType.TRACE_ID].reset(token)

app.add_middleware(ContextMiddleware)

# Использование в коде
def get_context() -> dict:
    return {
        "trace_id": context_vars[ContextType.TRACE_ID].get(),
        "request_id": context_vars[ContextType.REQUEST_ID].get(),
        "tenant_id": context_vars[ContextType.TENANT_ID].get(),
        "user_id": context_vars[ContextType.USER_ID].get(),
    }

7. Логирование в PostgreSQL

from sqlalchemy import Column, String, DateTime
from datetime import datetime

class RequestLog(Base):
    __tablename__ = "request_logs"
    
    id = Column(Integer, primary_key=True)
    trace_id = Column(String, nullable=False, index=True)
    request_id = Column(String, nullable=False, index=True)
    user_id = Column(Integer, nullable=True)
    endpoint = Column(String, nullable=False)
    method = Column(String, nullable=False)
    status_code = Column(Integer, nullable=False)
    duration_ms = Column(Float, nullable=False)
    timestamp = Column(DateTime, default=datetime.utcnow, index=True)

# Middleware для логирования
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        import time
        start_time = time.time()
        
        response = await call_next(request)
        
        duration_ms = (time.time() - start_time) * 1000
        trace_id = get_trace_id()
        request_id = context_vars[ContextType.REQUEST_ID].get()
        user_id = context_vars[ContextType.USER_ID].get()
        
        # Сохраняем в БД
        log_entry = RequestLog(
            trace_id=trace_id,
            request_id=request_id,
            user_id=user_id,
            endpoint=request.url.path,
            method=request.method,
            status_code=response.status_code,
            duration_ms=duration_ms
        )
        db.add(log_entry)
        db.commit()
        
        return response

8. Запрос логов по trace ID

# Пример: клиент нашёл ошибку
# "Где-то в 14:30 произошла ошибка при создании заказа"

# Находим trace_id по временной метке и событию
logs = db.query(RequestLog).filter(
    RequestLog.endpoint == "/orders",
    RequestLog.method == "POST",
    RequestLog.status_code != 200,
    RequestLog.timestamp.between(
        datetime(2024, 3, 22, 14, 25),
        datetime(2024, 3, 22, 14, 35)
    )
).all()

# Для каждого логгирования получаем все связанные логи
for log in logs:
    # Все логи с одинаковым trace_id = полная история запроса
    all_related_logs = db.query(RequestLog).filter(
        RequestLog.trace_id == log.trace_id
    ).order_by(RequestLog.timestamp).all()
    
    # Видим полный путь запроса через все сервисы
    for related in all_related_logs:
        print(f"{related.timestamp}: {related.service} - {related.endpoint} - {related.duration_ms}ms")

Best Practices

  • Всегда передавайте trace_id между сервисами
  • Генерируйте trace_id один раз на входном gateway
  • Используйте contextvars для thread-safe хранения
  • Логируйте trace_id везде (стандартизируйте формат)
  • Храните корреляционные ID в структурированных логах (JSON)
  • Используйте OpenTelemetry для enterprise решений
  • Индексируйте trace_id в логах для быстрого поиска
  • Выставляйте trace_id в ответе для отладки клиентом

Таблица заголовков (стандарт)

ЗаголовокНазначениеПример
X-Trace-IDТрассировка запроса550e8400-e29b-41d4-a716-446655440000
X-Request-IDУникален для каждого запросаreq_123abc
X-User-IDИдентификатор пользователя42
X-Tenant-IDМультитенант IDcustomer_xyz
X-Correlation-IDГруппировка связанных операцийcorr_xyz

Отслеживание trace ID — это основа наблюдаемости (observability) в микросервисных архитектурах!

Как отслеживать trace ID между сервисами? | PrepBro