Почему асинхронная очередь возникнет в БД?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Почему асинхронная очередь возникает в БД
Это фундаментальная проблема в распределённых системах, которая возникает из-за рассогласования между скоростью записи и скоростью обработки данных. Рассмотрим механизм этого явления.
Суть проблемы
В асинхронной архитектуре задачи записываются в БД быстрее, чем обрабатываются рабочими процессами. Это приводит к накоплению необработанных записей.
Производство задач (producer) → БД (очередь) → Обработка (worker)
████████ (скорость: 1000 задач/сек)
████ (скорость: 100 задач/сек)
Разница: 900 задач/сек остаются в БД
Сценарий 1: Всплеск трафика
# Например, вебхук от платёжной системы
from fastapi import FastAPI
from sqlalchemy.orm import Session
from app.models import PaymentTask
app = FastAPI()
@app.post("/webhook/payment")
async def process_payment_webhook(data: dict, db: Session):
# ✅ Быстрая запись в БД (< 1мс)
task = PaymentTask(
user_id=data[user_id],
amount=data[amount],
status=pending,
created_at=datetime.utcnow()
)
db.add(task)
db.commit()
return {"ok": True}
# Но обработка может быть медленной
def process_payment_task(task_id: int):
task = PaymentTask.query.get(task_id)
# ❌ Медленная обработка (например, 100мс на валидацию, API запросы)
validate_payment(task.amount)
call_bank_api(task.user_id, task.amount)
notify_user(task.user_id)
task.status = completed
db.commit()
Если одновременно приходит 10000 вебхуков в секунду, а обработка занимает 100мс:
- Записано в БД: 10000 задач/сек
- Обработано: 1000 / 0.1 = 10 воркеров × 10 задач = максимум 100-200 задач/сек
- Очередь растёт на 9800+ задач в секунду
Сценарий 2: Медленный внешний API
class OrderProcessor:
async def process_order(self, order_id: int):
order = Order.query.get(order_id)
# Быстро: запись в очередь (1мс)
task = ProcessingTask(
order_id=order_id,
status=pending
)
db.add(task)
db.commit()
# Медленно: ждём внешний сервис (2-5 сек)
try:
inventory_response = requests.post(
https://inventory-service.com/reserve,
json={order_id: order_id},
timeout=5 # ⚠️ Может быть даже дольше
)
inventory_response.raise_for_status()
except requests.Timeout:
# Задача остаётся в очереди
task.status = failed
db.commit()
return
# Даже если 50 воркеров, каждый занят на 5 секунд
# Только 10 заказов обрабатываются в секунду
# Если приходит 100 заказов в сек → очередь растёт
Сценарий 3: Недостаточно ресурсов
# Конфигурация Celery
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # один воркер = одна задача одновременно
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_WORKER_CONCURRENCY = 4 # всего 4 параллельных задачи
# Если база данных PostgreSQL может обрабатывать 1000 операций/сек,
# а воркеры ограничены на 4 параллельных задачи,
# каждая из которых обращается к БД:
# → максимум 400 операций/сек из БД
# → остальные 600 op/sec остаются в очереди
Математика очереди
from dataclasses import dataclass
from typing import List
@dataclass
class QueueMetrics:
arrival_rate: float # задач/сек
processing_rate: float # задач/сек
queue_size: int
@property
def is_stable(self) -> bool:
"""Очередь не растёт если processing >= arrival"""
return self.processing_rate >= self.arrival_rate
@property
def queue_growth_per_second(self) -> float:
"""На сколько задач в сек растёт очередь"""
return self.arrival_rate - self.processing_rate
def estimate_wait_time(self) -> float:
"""Примерное время ожидания в секундах"""
if self.processing_rate == 0:
return float(inf)
return self.queue_size / self.processing_rate
# Пример
metrics = QueueMetrics(
arrival_rate=1000, # 1000 задач в сек
processing_rate=800, # 800 задач в сек
queue_size=0
)
print(f"Рост очереди: {metrics.queue_growth_per_second} задач/сек") # 200
print(f"За 1 минуту очередь: {metrics.queue_growth_per_second * 60} задач") # 12000
print(f"За 1 час: {metrics.queue_growth_per_second * 3600} задач") # 720000
Пример из реальной системы
# Django ORM + Celery + PostgreSQL
from celery import shared_task
from django.core.mail import send_mail
from app.models import EmailTask
@shared_task(bind=True, max_retries=3)
def send_notification_email(self, user_id: int):
try:
user = User.objects.get(id=user_id)
# ⚠️ Медленная операция: SMTP запрос (2-5 сек)
send_mail(
subject=Уведомление,
message=Привет!,
from_email=noreply@example.com,
recipient_list=[user.email],
fail_silently=False
)
EmailTask.objects.filter(id=user_id).update(status=sent)
except Exception as exc:
# Повтор с экспоненциальной задержкой
raise self.retry(exc=exc, countdown=60)
# Если приходит 10000 писем в минуту:
# - Записано в БД: ~167 писем/сек
# - Отправлено через SMTP: ~2-5 писем/сек на одного воркера
# - С 10 воркерами: 20-50 писем/сек
# - Очередь растёт на 117-147 писем/сек
Как БД управляет очередью
# Типичная таблица очереди
CREATE TABLE task_queue (
id BIGSERIAL PRIMARY KEY,
task_type VARCHAR(100),
payload JSONB,
status VARCHAR(20), -- pending, processing, completed, failed
created_at TIMESTAMP DEFAULT NOW(),
scheduled_at TIMESTAMP,
attempts INT DEFAULT 0,
last_error TEXT,
updated_at TIMESTAMP
);
# Индексы для быстрого поиска
CREATE INDEX idx_task_status_created
ON task_queue(status, created_at)
WHERE status = pending;
# Типичный запрос воркера
SELECT * FROM task_queue
WHERE status = pending
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED; -- Важно для многопроцессности
# Если строк с status=pending миллионы → очередь переполнена
Решение проблемы
-
Увеличить производительность воркеров
- Добавить больше воркеров
- Оптимизировать код обработки
- Использовать асинхронные операции
-
Снизить нагрузку на очередь
- Фильтровать лишние задачи
- Использовать rate limiting
- Дебаунсинг частых событий
-
Масштабирование
- Использовать Redis вместо БД для очереди
- Шардировать очередь
- Распределённые воркеры
-
Мониторинг
- Отслеживать size очереди
- Алерты при переполнении
- Метрики latency обработки
Асинхронная очередь в БД — это нормальное явление, которое показывает несоответствие между производством и потреблением задач. Правильный мониторинг и масштабирование помогают держать очередь под контролем.