← Назад к вопросам

В каком месте проекта возникнет очередь у нагруженного асинхронного кода

3.0 Senior🔥 121 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Очереди в нагруженном асинхронном коде

Очередь (Queue) в асинхронном коде возникает в местах, где обработка данных происходит медленнее, чем они поступают. Это критически важно понимать для диагностики узких мест в системе.

Где возникают очереди

1. На входе асинхронного приложения

Самое первое место формирования очереди — это TCP/HTTP слой:

import asyncio
from aiohttp import web

async def slow_handler(request):
    """Медленный обработчик"""
    await asyncio.sleep(5)  # Обработка за 5 секунд
    return web.Response(text="Done")

app = web.Application()
app.router.add_get('/', slow_handler)
web.run_app(app, port=8080)

# Если приходит 100 запросов в секунду (RPS = 100),
# а каждый обрабатывается 5 секунд, то очередь будет расти!
# Пропускная способность = 1 запрос / 5 сек = 0.2 RPS
# Задержанные запросы = 100 * 5 = 500 запросов в очереди

Очередь находится:

  • На уровне OS — TCP backlog (очередь SYN пакетов)
  • На уровне фреймворка — asyncio event loop
  • На уровне приложения — внутри асинхронного кода
# Демонстрация очередей
import asyncio
import time
from typing import List

async def process_request(request_id: int, duration: float) -> float:
    """Обрабатываем один запрос"""
    start = time.time()
    print(f"Request {request_id} started")
    await asyncio.sleep(duration)  # Долгая операция
    elapsed = time.time() - start
    print(f"Request {request_id} completed in {elapsed:.1f}s")
    return elapsed

async def main():
    # Если отправить 10 задач, где каждая занимает 2 секунды
    tasks = [process_request(i, 2) for i in range(10)]
    
    # С asyncio они выполняются параллельно
    results = await asyncio.gather(*tasks)
    total_time = sum(results)
    print(f"\nTotal time (parallel): {max(results):.1f}s")
    print(f"Sum of durations: {total_time:.1f}s")
    
    # Но если система перегружена, очередь растёт!

asyncio.run(main())

2. На уровне базы данных

Очередь возникает, когда асинхронные задачи пытаются получить соединение с БД:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import QueuePool

# Пул соединений с параметрами
engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/db",
    poolclass=QueuePool,
    pool_size=10,           # Макс 10 одновременных соединений
    max_overflow=20,        # Макс 20 дополнительных (очередь!)
    pool_recycle=3600,      # Переиспользовать соединения
    echo=True
)

async def query_database(session: AsyncSession, user_id: int):
    # Если все 10 соединений заняты, задача ждёт в очереди
    # max_overflow=20 позволяет создать ещё 20, но это дорого
    result = await session.execute(
        "SELECT * FROM users WHERE id = :id",
        {"id": user_id}
    )
    return result.scalar()

Очередь в БД:

  • Connection pool — очередь ждущих соединений
  • Query queue — очередь SQL запросов на сервере БД
  • Lock queue — очередь транзакций, ждущих блокировок

3. На уровне внешних API

Очередь при обращении к внешним сервисам:

import aiohttp
import asyncio
from asyncio import Semaphore

class RateLimitedHTTPClient:
    def __init__(self, max_concurrent_requests: int = 5):
        # Семафор ограничивает одновременные запросы
        self.semaphore = Semaphore(max_concurrent_requests)
        self.session = None
    
    async def fetch(self, url: str) -> str:
        # Если все 5 слотов заняты, остальные ждут в очереди
        async with self.semaphore:
            async with self.session.get(url) as response:
                return await response.text()
    
    async def fetch_many(self, urls: list) -> list:
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks)

# Использование
async def main():
    client = RateLimitedHTTPClient(max_concurrent_requests=5)
    async with aiohttp.ClientSession() as session:
        client.session = session
        # 100 задач, но только 5 параллельно
        # 95 будут ждать в очереди семафора!
        results = await client.fetch_many([f"http://api.example.com/{i}" for i in range(100)])

4. На уровне обработчика событий (Event Loop)

import asyncio

async def fast_task(id: int) -> None:
    """Быстрая задача"""
    await asyncio.sleep(0.001)
    print(f"Fast task {id}")

async def slow_task(id: int) -> None:
    """Медленная задача, блокирует event loop"""
    await asyncio.sleep(1)
    print(f"Slow task {id}")

async def main():
    # Если запустить 1 медленную задачу и 10 быстрых
    # Быстрые будут ждать в очереди event loop!
    tasks = [
        slow_task(0),
        *[fast_task(i) for i in range(10)]
    ]
    await asyncio.gather(*tasks)

asyncio.run(main())
# Вывод: fast tasks будут выполнены только после slow_task!

5. На уровне микросервисов/очередей сообщений

Очередь при обработке сообщений:

from aiormq import Channel
from aio_pika import connect_robust, IncomingMessage
import asyncio

async def process_message(message: IncomingMessage) -> None:
    """Обработчик сообщения"""
    body = message.body.decode()
    print(f"Processing: {body}")
    await asyncio.sleep(5)  # Долгая обработка
    await message.ack()  # Подтверждаем обработку

async def main():
    connection = await connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    
    # Объявляем очередь
    queue = await channel.declare_queue("tasks")
    
    # Устанавливаем prefetch_count = 1
    # Это означает, что обработчик получит max 1 сообщение одновременно
    await channel.set_qos(prefetch_count=1)
    
    # Если в очереди 1000 сообщений, но обработчик берёт по 1,
    # то очередь будет медленно уменьшаться
    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            await process_message(message)

Где точно возникает очередь

Диагностический стек

import asyncio
import time
from typing import List, Callable
import traceback

class QueueDiagnostics:
    """Диагностика очередей в асинхронном коде"""
    
    async def measure_queue_depth(
        self,
        producer: Callable,
        consumer: Callable,
        duration: float = 10
    ) -> None:
        """Измеряем глубину очереди"""
        queue: asyncio.Queue = asyncio.Queue()
        queue_sizes: List[int] = []
        
        async def producer_task():
            start = time.time()
            while time.time() - start < duration:
                await queue.put(f"item_{int(time.time() * 1000)}")
                await asyncio.sleep(0.01)  # Быстрое добавление
        
        async def consumer_task():
            start = time.time()
            while time.time() - start < duration:
                item = await queue.get()
                await consumer(item)  # Медленная обработка
                queue.task_done()
        
        async def monitor_task():
            start = time.time()
            while time.time() - start < duration:
                size = queue.qsize()
                queue_sizes.append(size)
                print(f"Queue size: {size}")
                await asyncio.sleep(0.5)
        
        # Запускаем параллельно
        await asyncio.gather(
            producer_task(),
            consumer_task(),
            monitor_task()
        )
        
        print(f"\nMax queue depth: {max(queue_sizes)}")
        print(f"Avg queue depth: {sum(queue_sizes) / len(queue_sizes):.2f}")

# Пример использования
async def slow_consumer(item: str) -> None:
    """Медленный обработчик"""
    await asyncio.sleep(1)  # 1 сек на обработку
    print(f"Processed: {item}")

async def main():
    diagnostics = QueueDiagnostics()
    await diagnostics.measure_queue_depth(
        producer=lambda: None,
        consumer=slow_consumer,
        duration=5
    )

# asyncio.run(main())

Как избежать очередей

# 1. Увеличить пропускную способность
# Используй batch processing
async def process_batch(items: List[str]) -> None:
    """Обработка партиями быстрее"""
    # Запросить все одновременно
    results = await asyncio.gather(*[process_item(item) for item in items])

# 2. Использовать пулы рабочих
class WorkerPool:
    def __init__(self, worker_count: int = 10):
        self.queue = asyncio.Queue()
        self.workers = []
    
    async def add_task(self, coro) -> None:
        await self.queue.put(coro)
    
    async def worker(self) -> None:
        while True:
            coro = await self.queue.get()
            try:
                await coro
            finally:
                self.queue.task_done()

# 3. Масштабирование по горизонтали
# Запустить несколько инстансов приложения
# Использовать load balancer (nginx, Kubernetes)

# 4. Кэширование результатов
from functools import lru_cache

@lru_cache(maxsize=1000)
async def get_cached_data(key: str) -> str:
    # Результат кэшируется, повторные вызовы не идут в очередь
    return await fetch_from_api(key)

Заключение

Очередь в асинхронном коде возникает в местах, где обработка данных медленнее, чем их поступление:

  1. Входной слой — TCP/HTTP соединения
  2. БД соединения — пул соединений
  3. Внешние API — Rate limiting, семафоры
  4. Event Loop — долгие корутины блокируют другие
  5. Очереди сообщений — нехватка обработчиков

Диагностировать очередь помогает мониторинг, логирование и нагрузочное тестирование.

В каком месте проекта возникнет очередь у нагруженного асинхронного кода | PrepBro