← Назад к вопросам
Как не допустить зависание задачи в асинхронности?
2.0 Middle🔥 211 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Предотвращение зависания задач в асинхронности
Зависание (deadlock, infinite wait) — критическая проблема асинхронного кода. Вот полный набор техник для предотвращения.
1. Таймауты — основной инструмент
asyncio.wait_for() — самый простой способ:
import asyncio
async def slow_request():
await asyncio.sleep(100) # Имитация долгого запроса
async def main():
try:
result = await asyncio.wait_for(
slow_request(),
timeout=5.0 # Максимум 5 секунд
)
except asyncio.TimeoutError:
print("Запрос зависает, отменяем")
asyncio.run(main())
2. Таймауты для HTTP запросов
aiohttp с таймаутом:
import aiohttp
import asyncio
async def fetch(url: str):
timeout = aiohttp.ClientTimeout(total=10, connect=5) # 10s всего, 5s на подключение
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
return await response.text()
async def main():
try:
result = await fetch("https://example.com")
except asyncio.TimeoutError:
print("HTTP запрос зависает")
asyncio.run(main())
httpx с таймаутом:
import httpx
import asyncio
async def fetch(url: str):
timeout = httpx.Timeout(10.0) # 10 секунд
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url)
return response.text
asyncio.run(fetch("https://example.com"))
3. Таймауты для БД операций
asyncpg (PostgreSQL):
import asyncpg
import asyncio
async def fetch_user(user_id: int):
try:
conn = await asyncio.wait_for(
asyncpg.connect("postgresql://localhost/mydb"),
timeout=5.0
)
result = await asyncio.wait_for(
conn.fetch("SELECT * FROM users WHERE id = $1", user_id),
timeout=5.0
)
await conn.close()
return result
except asyncio.TimeoutError:
print("БД запрос зависает")
SQLAlchemy async:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import asyncio
async def get_user(user_id: int):
engine = create_async_engine(
"postgresql+asyncpg://localhost/mydb",
connect_args={"timeout": 5} # Таймаут подключения
)
async with AsyncSession(engine) as session:
try:
stmt = select(User).where(User.id == user_id)
result = await asyncio.wait_for(
session.execute(stmt),
timeout=5.0 # Таймаут запроса
)
return result.scalars().first()
except asyncio.TimeoutError:
print("Запрос БД зависает")
4. Таймауты для очередей (Queue)
import asyncio
async def process_queue():
queue = asyncio.Queue()
async def worker():
try:
while True:
# Ожидаем элемент с таймаутом
item = await asyncio.wait_for(
queue.get(),
timeout=10.0
)
print(f"Обработка {item}")
except asyncio.TimeoutError:
print("Ничего в очереди 10 секунд")
asyncio.create_task(worker())
# Добавляем элементы с задержкой
for i in range(3):
await asyncio.sleep(3)
queue.put_nowait(f"item_{i}")
asyncio.run(process_queue())
5. Защита от Deadlock в семафорах
import asyncio
async def deadlock_example():
lock = asyncio.Lock()
async def task1():
async with lock:
print("Task 1 получила lock")
await asyncio.sleep(1)
# Если Task 2 также ждёт lock — deadlock!
async def task2():
# Таймаут при попытке получить lock
try:
async with asyncio.wait_for(
lock.acquire(),
timeout=2.0
):
print("Task 2 получила lock")
except asyncio.TimeoutError:
print("Task 2 не смогла получить lock (deadlock!)")
await asyncio.gather(
task1(),
asyncio.sleep(0.1), # Task 2 стартует после Task 1
task2()
)
6. Shield для критических операций
import asyncio
async def critical_operation():
try:
# Эта операция не будет отменена даже с таймаутом
result = await asyncio.shield(
asyncio.sleep(5)
)
except asyncio.TimeoutError:
print("Таймаут, но операция продолжается в фоне")
7. Контроль зависания в asyncio.gather()
import asyncio
async def risky_task():
await asyncio.sleep(100) # Может зависнуть
async def main():
# Выполняем несколько задач с таймаутом
try:
results = await asyncio.wait_for(
asyncio.gather(
risky_task(),
risky_task(),
return_exceptions=True # Не прерываем другие на исключение
),
timeout=5.0
)
except asyncio.TimeoutError:
print("Одна или несколько задач зависают")
8. asyncio.wait() с таймаутом
import asyncio
async def main():
tasks = [
asyncio.create_task(asyncio.sleep(10)),
asyncio.create_task(asyncio.sleep(10))
]
# Ждём первый результат или таймаут
done, pending = await asyncio.wait(
tasks,
timeout=2.0,
return_when=asyncio.FIRST_COMPLETED # Вернуть при первом результате
)
print(f"Завершено: {len(done)}, Ожидают: {len(pending)}")
# Отменяем оставшиеся задачи
for task in pending:
task.cancel()
9. Контроль зависания в Context Manager
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def timeout_context(seconds: float):
try:
async with asyncio.timeout(seconds): # Python 3.11+
yield
except asyncio.TimeoutError:
print(f"Операция зависла (таймаут {seconds}s)")
async def example():
async with timeout_context(2.0):
await asyncio.sleep(10) # Зависнет!
asyncio.run(example())
10. Комплексный пример с логированием
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def fetch_with_timeout(
url: str,
timeout: float = 10.0,
max_retries: int = 3
) -> str:
"""Безопасный fetch с таймаутом и повторами."""
for attempt in range(max_retries):
try:
logger.info(f"Попытка {attempt + 1}/{max_retries}: {url}")
# Пример с asyncio
result = await asyncio.wait_for(
asyncio.sleep(5), # Имитация HTTP
timeout=timeout
)
logger.info(f"Успех: {url}")
return result
except asyncio.TimeoutError:
logger.warning(f"Таймаут (попытка {attempt + 1})")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Экспоненциальный backoff
else:
logger.error(f"Не удалось после {max_retries} попыток")
raise
async def main():
try:
result = await fetch_with_timeout("https://example.com", timeout=2.0)
except asyncio.TimeoutError:
logger.error("Финальный таймаут")
asyncio.run(main())
11. Race condition с await
import asyncio
async def unsafe_operation():
# Могут быть race condition между проверкой и действием
if condition: # Проверка
await asyncio.sleep(1) # Действие (между ними может измениться condition)
async def safe_operation():
# Используй lock или atomic операции
async with lock:
if condition:
await asyncio.sleep(1)
Лучшие практики
✅ Делай так:
- Всегда добавляй таймауты к внешним операциям
- Логируй зависания для отладки
- Используй
asyncio.wait_for()по умолчанию - Тестируй сценарии медленного интернета
- Используй
return_exceptions=Trueвgather()
❌ Не делай так:
awaitбез таймаутов на HTTP/БД- Бесконечные while True без условия выхода
- Скрытые синхронные операции в async (блокируют цикл)
- Игнорируй
asyncio.TimeoutError
Зависание в асинхронном коде — это молчаливый убийца производительности. Таймауты — твой первый лучший друг.