← Назад к вопросам
В каком месте проекта возникнет очередь у нагруженного асинхронного кода
3.0 Senior🔥 121 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Очереди в нагруженном асинхронном коде
Очередь (Queue) в асинхронном коде возникает в местах, где обработка данных происходит медленнее, чем они поступают. Это критически важно понимать для диагностики узких мест в системе.
Где возникают очереди
1. На входе асинхронного приложения
Самое первое место формирования очереди — это TCP/HTTP слой:
import asyncio
from aiohttp import web
async def slow_handler(request):
"""Медленный обработчик"""
await asyncio.sleep(5) # Обработка за 5 секунд
return web.Response(text="Done")
app = web.Application()
app.router.add_get('/', slow_handler)
web.run_app(app, port=8080)
# Если приходит 100 запросов в секунду (RPS = 100),
# а каждый обрабатывается 5 секунд, то очередь будет расти!
# Пропускная способность = 1 запрос / 5 сек = 0.2 RPS
# Задержанные запросы = 100 * 5 = 500 запросов в очереди
Очередь находится:
- На уровне OS — TCP backlog (очередь SYN пакетов)
- На уровне фреймворка — asyncio event loop
- На уровне приложения — внутри асинхронного кода
# Демонстрация очередей
import asyncio
import time
from typing import List
async def process_request(request_id: int, duration: float) -> float:
"""Обрабатываем один запрос"""
start = time.time()
print(f"Request {request_id} started")
await asyncio.sleep(duration) # Долгая операция
elapsed = time.time() - start
print(f"Request {request_id} completed in {elapsed:.1f}s")
return elapsed
async def main():
# Если отправить 10 задач, где каждая занимает 2 секунды
tasks = [process_request(i, 2) for i in range(10)]
# С asyncio они выполняются параллельно
results = await asyncio.gather(*tasks)
total_time = sum(results)
print(f"\nTotal time (parallel): {max(results):.1f}s")
print(f"Sum of durations: {total_time:.1f}s")
# Но если система перегружена, очередь растёт!
asyncio.run(main())
2. На уровне базы данных
Очередь возникает, когда асинхронные задачи пытаются получить соединение с БД:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import QueuePool
# Пул соединений с параметрами
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
poolclass=QueuePool,
pool_size=10, # Макс 10 одновременных соединений
max_overflow=20, # Макс 20 дополнительных (очередь!)
pool_recycle=3600, # Переиспользовать соединения
echo=True
)
async def query_database(session: AsyncSession, user_id: int):
# Если все 10 соединений заняты, задача ждёт в очереди
# max_overflow=20 позволяет создать ещё 20, но это дорого
result = await session.execute(
"SELECT * FROM users WHERE id = :id",
{"id": user_id}
)
return result.scalar()
Очередь в БД:
- Connection pool — очередь ждущих соединений
- Query queue — очередь SQL запросов на сервере БД
- Lock queue — очередь транзакций, ждущих блокировок
3. На уровне внешних API
Очередь при обращении к внешним сервисам:
import aiohttp
import asyncio
from asyncio import Semaphore
class RateLimitedHTTPClient:
def __init__(self, max_concurrent_requests: int = 5):
# Семафор ограничивает одновременные запросы
self.semaphore = Semaphore(max_concurrent_requests)
self.session = None
async def fetch(self, url: str) -> str:
# Если все 5 слотов заняты, остальные ждут в очереди
async with self.semaphore:
async with self.session.get(url) as response:
return await response.text()
async def fetch_many(self, urls: list) -> list:
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
# Использование
async def main():
client = RateLimitedHTTPClient(max_concurrent_requests=5)
async with aiohttp.ClientSession() as session:
client.session = session
# 100 задач, но только 5 параллельно
# 95 будут ждать в очереди семафора!
results = await client.fetch_many([f"http://api.example.com/{i}" for i in range(100)])
4. На уровне обработчика событий (Event Loop)
import asyncio
async def fast_task(id: int) -> None:
"""Быстрая задача"""
await asyncio.sleep(0.001)
print(f"Fast task {id}")
async def slow_task(id: int) -> None:
"""Медленная задача, блокирует event loop"""
await asyncio.sleep(1)
print(f"Slow task {id}")
async def main():
# Если запустить 1 медленную задачу и 10 быстрых
# Быстрые будут ждать в очереди event loop!
tasks = [
slow_task(0),
*[fast_task(i) for i in range(10)]
]
await asyncio.gather(*tasks)
asyncio.run(main())
# Вывод: fast tasks будут выполнены только после slow_task!
5. На уровне микросервисов/очередей сообщений
Очередь при обработке сообщений:
from aiormq import Channel
from aio_pika import connect_robust, IncomingMessage
import asyncio
async def process_message(message: IncomingMessage) -> None:
"""Обработчик сообщения"""
body = message.body.decode()
print(f"Processing: {body}")
await asyncio.sleep(5) # Долгая обработка
await message.ack() # Подтверждаем обработку
async def main():
connection = await connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
# Объявляем очередь
queue = await channel.declare_queue("tasks")
# Устанавливаем prefetch_count = 1
# Это означает, что обработчик получит max 1 сообщение одновременно
await channel.set_qos(prefetch_count=1)
# Если в очереди 1000 сообщений, но обработчик берёт по 1,
# то очередь будет медленно уменьшаться
async with queue.iterator() as queue_iter:
async for message in queue_iter:
await process_message(message)
Где точно возникает очередь
Диагностический стек
import asyncio
import time
from typing import List, Callable
import traceback
class QueueDiagnostics:
"""Диагностика очередей в асинхронном коде"""
async def measure_queue_depth(
self,
producer: Callable,
consumer: Callable,
duration: float = 10
) -> None:
"""Измеряем глубину очереди"""
queue: asyncio.Queue = asyncio.Queue()
queue_sizes: List[int] = []
async def producer_task():
start = time.time()
while time.time() - start < duration:
await queue.put(f"item_{int(time.time() * 1000)}")
await asyncio.sleep(0.01) # Быстрое добавление
async def consumer_task():
start = time.time()
while time.time() - start < duration:
item = await queue.get()
await consumer(item) # Медленная обработка
queue.task_done()
async def monitor_task():
start = time.time()
while time.time() - start < duration:
size = queue.qsize()
queue_sizes.append(size)
print(f"Queue size: {size}")
await asyncio.sleep(0.5)
# Запускаем параллельно
await asyncio.gather(
producer_task(),
consumer_task(),
monitor_task()
)
print(f"\nMax queue depth: {max(queue_sizes)}")
print(f"Avg queue depth: {sum(queue_sizes) / len(queue_sizes):.2f}")
# Пример использования
async def slow_consumer(item: str) -> None:
"""Медленный обработчик"""
await asyncio.sleep(1) # 1 сек на обработку
print(f"Processed: {item}")
async def main():
diagnostics = QueueDiagnostics()
await diagnostics.measure_queue_depth(
producer=lambda: None,
consumer=slow_consumer,
duration=5
)
# asyncio.run(main())
Как избежать очередей
# 1. Увеличить пропускную способность
# Используй batch processing
async def process_batch(items: List[str]) -> None:
"""Обработка партиями быстрее"""
# Запросить все одновременно
results = await asyncio.gather(*[process_item(item) for item in items])
# 2. Использовать пулы рабочих
class WorkerPool:
def __init__(self, worker_count: int = 10):
self.queue = asyncio.Queue()
self.workers = []
async def add_task(self, coro) -> None:
await self.queue.put(coro)
async def worker(self) -> None:
while True:
coro = await self.queue.get()
try:
await coro
finally:
self.queue.task_done()
# 3. Масштабирование по горизонтали
# Запустить несколько инстансов приложения
# Использовать load balancer (nginx, Kubernetes)
# 4. Кэширование результатов
from functools import lru_cache
@lru_cache(maxsize=1000)
async def get_cached_data(key: str) -> str:
# Результат кэшируется, повторные вызовы не идут в очередь
return await fetch_from_api(key)
Заключение
Очередь в асинхронном коде возникает в местах, где обработка данных медленнее, чем их поступление:
- Входной слой — TCP/HTTP соединения
- БД соединения — пул соединений
- Внешние API — Rate limiting, семафоры
- Event Loop — долгие корутины блокируют другие
- Очереди сообщений — нехватка обработчиков
Диагностировать очередь помогает мониторинг, логирование и нагрузочное тестирование.