← Назад к вопросам

Почему асинхронная очередь возникнет в БД?

1.3 Junior🔥 111 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Почему асинхронная очередь возникает в БД

Это фундаментальная проблема в распределённых системах, которая возникает из-за рассогласования между скоростью записи и скоростью обработки данных. Рассмотрим механизм этого явления.

Суть проблемы

В асинхронной архитектуре задачи записываются в БД быстрее, чем обрабатываются рабочими процессами. Это приводит к накоплению необработанных записей.

Производство задач (producer) → БД (очередь) → Обработка (worker)
   ████████ (скорость: 1000 задач/сек)
              ████ (скорость: 100 задач/сек)
   
   Разница: 900 задач/сек остаются в БД

Сценарий 1: Всплеск трафика

# Например, вебхук от платёжной системы
from fastapi import FastAPI
from sqlalchemy.orm import Session
from app.models import PaymentTask

app = FastAPI()

@app.post("/webhook/payment")
async def process_payment_webhook(data: dict, db: Session):
    # ✅ Быстрая запись в БД (< 1мс)
    task = PaymentTask(
        user_id=data[user_id],
        amount=data[amount],
        status=pending,
        created_at=datetime.utcnow()
    )
    db.add(task)
    db.commit()
    return {"ok": True}

# Но обработка может быть медленной
def process_payment_task(task_id: int):
    task = PaymentTask.query.get(task_id)
    # ❌ Медленная обработка (например, 100мс на валидацию, API запросы)
    validate_payment(task.amount)
    call_bank_api(task.user_id, task.amount)
    notify_user(task.user_id)
    task.status = completed
    db.commit()

Если одновременно приходит 10000 вебхуков в секунду, а обработка занимает 100мс:

  • Записано в БД: 10000 задач/сек
  • Обработано: 1000 / 0.1 = 10 воркеров × 10 задач = максимум 100-200 задач/сек
  • Очередь растёт на 9800+ задач в секунду

Сценарий 2: Медленный внешний API

class OrderProcessor:
    async def process_order(self, order_id: int):
        order = Order.query.get(order_id)
        
        # Быстро: запись в очередь (1мс)
        task = ProcessingTask(
            order_id=order_id,
            status=pending
        )
        db.add(task)
        db.commit()
        
        # Медленно: ждём внешний сервис (2-5 сек)
        try:
            inventory_response = requests.post(
                https://inventory-service.com/reserve,
                json={order_id: order_id},
                timeout=5  # ⚠️ Может быть даже дольше
            )
            inventory_response.raise_for_status()
        except requests.Timeout:
            # Задача остаётся в очереди
            task.status = failed
            db.commit()
            return
        
        # Даже если 50 воркеров, каждый занят на 5 секунд
        # Только 10 заказов обрабатываются в секунду
        # Если приходит 100 заказов в сек → очередь растёт

Сценарий 3: Недостаточно ресурсов

# Конфигурация Celery
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # один воркер = одна задача одновременно
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_WORKER_CONCURRENCY = 4  # всего 4 параллельных задачи

# Если база данных PostgreSQL может обрабатывать 1000 операций/сек,
# а воркеры ограничены на 4 параллельных задачи,
# каждая из которых обращается к БД:
# → максимум 400 операций/сек из БД
# → остальные 600 op/sec остаются в очереди

Математика очереди

from dataclasses import dataclass
from typing import List

@dataclass
class QueueMetrics:
    arrival_rate: float  # задач/сек
    processing_rate: float  # задач/сек
    queue_size: int
    
    @property
    def is_stable(self) -> bool:
        """Очередь не растёт если processing >= arrival"""
        return self.processing_rate >= self.arrival_rate
    
    @property
    def queue_growth_per_second(self) -> float:
        """На сколько задач в сек растёт очередь"""
        return self.arrival_rate - self.processing_rate
    
    def estimate_wait_time(self) -> float:
        """Примерное время ожидания в секундах"""
        if self.processing_rate == 0:
            return float(inf)
        return self.queue_size / self.processing_rate

# Пример
metrics = QueueMetrics(
    arrival_rate=1000,  # 1000 задач в сек
    processing_rate=800,  # 800 задач в сек
    queue_size=0
)

print(f"Рост очереди: {metrics.queue_growth_per_second} задач/сек")  # 200
print(f"За 1 минуту очередь: {metrics.queue_growth_per_second * 60} задач")  # 12000
print(f"За 1 час: {metrics.queue_growth_per_second * 3600} задач")  # 720000

Пример из реальной системы

# Django ORM + Celery + PostgreSQL

from celery import shared_task
from django.core.mail import send_mail
from app.models import EmailTask

@shared_task(bind=True, max_retries=3)
def send_notification_email(self, user_id: int):
    try:
        user = User.objects.get(id=user_id)
        # ⚠️ Медленная операция: SMTP запрос (2-5 сек)
        send_mail(
            subject=Уведомление,
            message=Привет!,
            from_email=noreply@example.com,
            recipient_list=[user.email],
            fail_silently=False
        )
        EmailTask.objects.filter(id=user_id).update(status=sent)
    except Exception as exc:
        # Повтор с экспоненциальной задержкой
        raise self.retry(exc=exc, countdown=60)

# Если приходит 10000 писем в минуту:
# - Записано в БД: ~167 писем/сек
# - Отправлено через SMTP: ~2-5 писем/сек на одного воркера
# - С 10 воркерами: 20-50 писем/сек
# - Очередь растёт на 117-147 писем/сек

Как БД управляет очередью

# Типичная таблица очереди
CREATE TABLE task_queue (
    id BIGSERIAL PRIMARY KEY,
    task_type VARCHAR(100),
    payload JSONB,
    status VARCHAR(20),  -- pending, processing, completed, failed
    created_at TIMESTAMP DEFAULT NOW(),
    scheduled_at TIMESTAMP,
    attempts INT DEFAULT 0,
    last_error TEXT,
    updated_at TIMESTAMP
);

# Индексы для быстрого поиска
CREATE INDEX idx_task_status_created 
    ON task_queue(status, created_at)
    WHERE status = pending;

# Типичный запрос воркера
SELECT * FROM task_queue 
WHERE status = pending 
ORDER BY created_at ASC 
LIMIT 1 
FOR UPDATE SKIP LOCKED;  -- Важно для многопроцессности

# Если строк с status=pending миллионы → очередь переполнена

Решение проблемы

  1. Увеличить производительность воркеров

    • Добавить больше воркеров
    • Оптимизировать код обработки
    • Использовать асинхронные операции
  2. Снизить нагрузку на очередь

    • Фильтровать лишние задачи
    • Использовать rate limiting
    • Дебаунсинг частых событий
  3. Масштабирование

    • Использовать Redis вместо БД для очереди
    • Шардировать очередь
    • Распределённые воркеры
  4. Мониторинг

    • Отслеживать size очереди
    • Алерты при переполнении
    • Метрики latency обработки

Асинхронная очередь в БД — это нормальное явление, которое показывает несоответствие между производством и потреблением задач. Правильный мониторинг и масштабирование помогают держать очередь под контролем.