← Назад к вопросам

Зачем нужна кооперативная многозадачность в Python?

2.0 Middle🔥 131 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Кооперативная многозадачность в Python (Cooperative Multitasking)

Кооперативная многозадачность — это парадигма, где несколько задач выполняются в одном потоке, с явной передачей управления. Это решает множество проблем параллелизма в Python.

Проблема: почему многопоточность недостаточно?

GIL (Global Interpreter Lock)

Python имеет GIL — глобальную блокировку интерпретатора. Только один поток может выполнять Python код одновременно.

import threading
import time

# ❌ ПЛОХО: многопоточность НЕ даёт ускорение
def cpu_intensive_task(n):
    total = 0
    for i in range(n):
        total += i
    return total

start = time.time()

# Один поток
result1 = cpu_intensive_task(100000000)
result2 = cpu_intensive_task(100000000)

print(f"Однопоточно: {time.time() - start:.2f}s")

# Два потока (медленнее из-за GIL!)
start = time.time()
t1 = threading.Thread(target=cpu_intensive_task, args=(100000000,))
t2 = threading.Thread(target=cpu_intensive_task, args=(100000000,))
t1.start()
t2.start()
t1.join()
t2.join()

print(f"Двухпоточно: {time.time() - start:.2f}s")  # Медленнее!

Выход:

Однопоточно: 3.5s
Двухпоточно: 5.2s  # Медленнее!

Почему? Потому что потоки конкурируют за GIL, и переключение контекста забирает время.

Блокирующие операции

Если функция блокируется на I/O (сеть, файлы, БД), весь поток ждёт:

import requests
import threading

# ❌ ПЛОХО: если fetch_data блокируется 1 сек, всё ждёт
def fetch_data(url):
    response = requests.get(url)  # 1 секунда ожидания
    return response.json()

def process_urls(urls):
    results = []
    for url in urls:
        result = fetch_data(url)  # Последовательно, 10 сек всего
        results.append(result)
    return results

start = time.time()
results = process_urls(['http://api1.com', 'http://api2.com', ...] * 10)
print(f"Время: {time.time() - start:.2f}s")  # ~10 seconds

Многопоточность помогает, но добавляет сложность:

# Многопоточность работает, но...
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(fetch_data, url) for url in urls]
    results = [f.result() for f in futures]  # 1 сек вместо 10
    # Но нужно управлять потоками, синхронизацией, исключениями...

Решение: Кооперативная многозадачность (Async/Await)

Вместо полагаться на операционную систему переключать потоки, сама программа говорит: "Я ждал, забери у меня управление!".

# ✅ ХОРОШО: асинхронность без потоков
import asyncio
import aiohttp

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.json()

async def process_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in urls]
        results = await asyncio.gather(*tasks)  # Параллельно в одном потоке!
    return results

async def main():
    urls = ['http://api1.com', 'http://api2.com'] * 10
    start = time.time()
    results = await process_urls(urls)
    print(f"Время: {time.time() - start:.2f}s")  # ~1 сек!

asyncio.run(main())

Вывод: 10 секунд → 1 секунда!

Как это работает: Event Loop

Еvent Loop — это сердце асинхронности:

┌─────────────────────────────────────────┐
│          Event Loop (в одном потоке)    │
├─────────────────────────────────────────┤
│                                         │
│  1. Task 1: fetch_data('http://api1')  │
│     → Запрос ушёл, управление взято    │
│                                         │
│  2. Task 2: fetch_data('http://api2')  │
│     → Запрос ушёл, управление взято    │
│                                         │
│  3. Task 3: fetch_data('http://api3')  │
│     → Запрос ушёл, управление взято    │
│                                         │
│  [Ожидаем ответов в фоне]               │
│                                         │
│  ← api1 вернула ответ                   │
│  Task 1 продолжает работу               │
│                                         │
│  ← api2 вернула ответ                   │
│  Task 2 продолжает работу               │
│                                         │
└─────────────────────────────────────────┘

Всё в одном потоке, но кажется параллельным!

Ключевая концепция: await

await — это точка, где функция говорит: "Я ждаю результата. Дай управление другой задаче".

async def example():
    print("1. Начало")  # Выполняется
    
    result = await long_running_task()  # ⬅️ AWAIT: передаём управление
    # В этот момент event loop может выполнить другие задачи
    
    print(f"2. Результат: {result}")  # Выполняется после завершения

# БЕЗ await функция заблокирует весь event loop:
async def blocking_example():
    result = long_running_task()  # ❌ Блокирует!
    # Даже другие асинхронные задачи не выполнятся

Практические примеры

Пример 1: Параллельные HTTP запросы

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_multiple_urls(urls):
    async with aiohttp.ClientSession() as session:
        # Все запросы идут параллельно
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

# Использование
async def main():
    urls = ['https://api.github.com/users/github', 'https://api.github.com/users/google']
    results = await fetch_multiple_urls(urls)
    print(f"Получили {len(results)} ответов")

asyncio.run(main())

Пример 2: FastAPI с асинхронностью

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import aioredis

app = FastAPI()

# Асинхронный движок БД
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')

# Асинхронный Redis
redis = None

@app.on_event('startup')
async def startup():
    global redis
    redis = await aioredis.create_redis_pool('redis://localhost')

@app.get('/users/{user_id}')
async def get_user(user_id: int):
    # Все операции асинхронные, не блокируют друг друга
    async with AsyncSession(engine) as session:
        user = await session.get(User, user_id)
    
    # Кэш в Redis
    cached = await redis.get(f'user:{user_id}')
    if cached:
        return json.loads(cached)
    
    result = {'id': user.id, 'name': user.name}
    await redis.set(f'user:{user_id}', json.dumps(result))
    
    return result

# 10 одновременных запросов выполняются в одном потоке
# Благодаря асинхронности!  

Пример 3: Таймауты и отмены

async def fetch_with_timeout(url, timeout=5):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=timeout) as response:
                return await response.json()
    except asyncio.TimeoutError:
        return {'error': 'Timeout'}

async def cancel_example():
    # Создаём задачу
    task = asyncio.create_task(long_running_task())
    
    try:
        # Ждём 5 секунд
        result = await asyncio.wait_for(task, timeout=5)
    except asyncio.TimeoutError:
        task.cancel()  # Отменяем задачу
        print("Задача отменена")

Пример 4: Очередь задач (Producer-Consumer)

import asyncio

async def producer(queue):
    for i in range(10):
        await queue.put(i)
        await asyncio.sleep(0.1)  # Имитируем работу

async def consumer(queue, worker_id):
    while True:
        item = await queue.get()  # Ждём элемент (без блокировки!)
        if item is None:  # Сигнал завершения
            break
        print(f"Worker {worker_id} обрабатывает {item}")
        await asyncio.sleep(0.2)  # Обработка
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    
    # Producer и 3 consumer'а работают параллельно
    await asyncio.gather(
        producer(queue),
        consumer(queue, 1),
        consumer(queue, 2),
        consumer(queue, 3),
    )

asyncio.run(main())

Когда использовать async/await?

Идеально для:

  • I/O операции (HTTP, БД, файлы): 10-100x ускорение
  • Веб-приложения (FastAPI, Sanic): обрабатываем 1000+ одновременных клиентов в одном потоке
  • Очереди сообщений (RabbitMQ, Kafka)
  • Вебсокеты и real-time системы
  • Микросервисы с множеством внешних вызовов

Не идеально для:

  • CPU-интенсивные операции: используй multiprocessing
  • Простые скрипты: усложняет код

Практический пример: миграция с sync на async

# ❌ SYNC: вызвать 1000 API берёт 1000 сек
def fetch_all_sync(user_ids):
    results = []
    for uid in user_ids:
        response = requests.get(f'https://api.example.com/users/{uid}')
        results.append(response.json())
    return results

# ✅ ASYNC: вызвать 1000 API берёт 1-2 сек
async def fetch_all_async(user_ids):
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_url(session, f'https://api.example.com/users/{uid}')
            for uid in user_ids
        ]
        return await asyncio.gather(*tasks)

# Ускорение: 1000x раз!

Выводы

Кооперативная многозадачность нужна для:

  • Обхода GIL (GIL не блокирует при await)
  • Эффективной работы с I/O операциями
  • Масштабирования веб-приложений
  • Обработки множества одновременных клиентов

Золотые правила:

  1. Используй async/await для I/O операций
  2. Используй multiprocessing для CPU операций
  3. Всегда await асинхронные функции
  4. Избегай блокирующих операций внутри async функций
  5. Используй асинхронные версии библиотек (aiohttp вместо requests)

Кооперативная многозадачность — это не просто оптимизация, это новая парадигма, которая делает Python мощным для современных высоконагруженных систем.

Зачем нужна кооперативная многозадачность в Python? | PrepBro