Зачем нужна кооперативная многозадачность в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Кооперативная многозадачность в Python (Cooperative Multitasking)
Кооперативная многозадачность — это парадигма, где несколько задач выполняются в одном потоке, с явной передачей управления. Это решает множество проблем параллелизма в Python.
Проблема: почему многопоточность недостаточно?
GIL (Global Interpreter Lock)
Python имеет GIL — глобальную блокировку интерпретатора. Только один поток может выполнять Python код одновременно.
import threading
import time
# ❌ ПЛОХО: многопоточность НЕ даёт ускорение
def cpu_intensive_task(n):
total = 0
for i in range(n):
total += i
return total
start = time.time()
# Один поток
result1 = cpu_intensive_task(100000000)
result2 = cpu_intensive_task(100000000)
print(f"Однопоточно: {time.time() - start:.2f}s")
# Два потока (медленнее из-за GIL!)
start = time.time()
t1 = threading.Thread(target=cpu_intensive_task, args=(100000000,))
t2 = threading.Thread(target=cpu_intensive_task, args=(100000000,))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Двухпоточно: {time.time() - start:.2f}s") # Медленнее!
Выход:
Однопоточно: 3.5s
Двухпоточно: 5.2s # Медленнее!
Почему? Потому что потоки конкурируют за GIL, и переключение контекста забирает время.
Блокирующие операции
Если функция блокируется на I/O (сеть, файлы, БД), весь поток ждёт:
import requests
import threading
# ❌ ПЛОХО: если fetch_data блокируется 1 сек, всё ждёт
def fetch_data(url):
response = requests.get(url) # 1 секунда ожидания
return response.json()
def process_urls(urls):
results = []
for url in urls:
result = fetch_data(url) # Последовательно, 10 сек всего
results.append(result)
return results
start = time.time()
results = process_urls(['http://api1.com', 'http://api2.com', ...] * 10)
print(f"Время: {time.time() - start:.2f}s") # ~10 seconds
Многопоточность помогает, но добавляет сложность:
# Многопоточность работает, но...
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(fetch_data, url) for url in urls]
results = [f.result() for f in futures] # 1 сек вместо 10
# Но нужно управлять потоками, синхронизацией, исключениями...
Решение: Кооперативная многозадачность (Async/Await)
Вместо полагаться на операционную систему переключать потоки, сама программа говорит: "Я ждал, забери у меня управление!".
# ✅ ХОРОШО: асинхронность без потоков
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.json()
async def process_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks) # Параллельно в одном потоке!
return results
async def main():
urls = ['http://api1.com', 'http://api2.com'] * 10
start = time.time()
results = await process_urls(urls)
print(f"Время: {time.time() - start:.2f}s") # ~1 сек!
asyncio.run(main())
Вывод: 10 секунд → 1 секунда!
Как это работает: Event Loop
Еvent Loop — это сердце асинхронности:
┌─────────────────────────────────────────┐
│ Event Loop (в одном потоке) │
├─────────────────────────────────────────┤
│ │
│ 1. Task 1: fetch_data('http://api1') │
│ → Запрос ушёл, управление взято │
│ │
│ 2. Task 2: fetch_data('http://api2') │
│ → Запрос ушёл, управление взято │
│ │
│ 3. Task 3: fetch_data('http://api3') │
│ → Запрос ушёл, управление взято │
│ │
│ [Ожидаем ответов в фоне] │
│ │
│ ← api1 вернула ответ │
│ Task 1 продолжает работу │
│ │
│ ← api2 вернула ответ │
│ Task 2 продолжает работу │
│ │
└─────────────────────────────────────────┘
Всё в одном потоке, но кажется параллельным!
Ключевая концепция: await
await — это точка, где функция говорит: "Я ждаю результата. Дай управление другой задаче".
async def example():
print("1. Начало") # Выполняется
result = await long_running_task() # ⬅️ AWAIT: передаём управление
# В этот момент event loop может выполнить другие задачи
print(f"2. Результат: {result}") # Выполняется после завершения
# БЕЗ await функция заблокирует весь event loop:
async def blocking_example():
result = long_running_task() # ❌ Блокирует!
# Даже другие асинхронные задачи не выполнятся
Практические примеры
Пример 1: Параллельные HTTP запросы
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls):
async with aiohttp.ClientSession() as session:
# Все запросы идут параллельно
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Использование
async def main():
urls = ['https://api.github.com/users/github', 'https://api.github.com/users/google']
results = await fetch_multiple_urls(urls)
print(f"Получили {len(results)} ответов")
asyncio.run(main())
Пример 2: FastAPI с асинхронностью
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import aioredis
app = FastAPI()
# Асинхронный движок БД
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
# Асинхронный Redis
redis = None
@app.on_event('startup')
async def startup():
global redis
redis = await aioredis.create_redis_pool('redis://localhost')
@app.get('/users/{user_id}')
async def get_user(user_id: int):
# Все операции асинхронные, не блокируют друг друга
async with AsyncSession(engine) as session:
user = await session.get(User, user_id)
# Кэш в Redis
cached = await redis.get(f'user:{user_id}')
if cached:
return json.loads(cached)
result = {'id': user.id, 'name': user.name}
await redis.set(f'user:{user_id}', json.dumps(result))
return result
# 10 одновременных запросов выполняются в одном потоке
# Благодаря асинхронности!
Пример 3: Таймауты и отмены
async def fetch_with_timeout(url, timeout=5):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
return await response.json()
except asyncio.TimeoutError:
return {'error': 'Timeout'}
async def cancel_example():
# Создаём задачу
task = asyncio.create_task(long_running_task())
try:
# Ждём 5 секунд
result = await asyncio.wait_for(task, timeout=5)
except asyncio.TimeoutError:
task.cancel() # Отменяем задачу
print("Задача отменена")
Пример 4: Очередь задач (Producer-Consumer)
import asyncio
async def producer(queue):
for i in range(10):
await queue.put(i)
await asyncio.sleep(0.1) # Имитируем работу
async def consumer(queue, worker_id):
while True:
item = await queue.get() # Ждём элемент (без блокировки!)
if item is None: # Сигнал завершения
break
print(f"Worker {worker_id} обрабатывает {item}")
await asyncio.sleep(0.2) # Обработка
queue.task_done()
async def main():
queue = asyncio.Queue()
# Producer и 3 consumer'а работают параллельно
await asyncio.gather(
producer(queue),
consumer(queue, 1),
consumer(queue, 2),
consumer(queue, 3),
)
asyncio.run(main())
Когда использовать async/await?
Идеально для:
- I/O операции (HTTP, БД, файлы): 10-100x ускорение
- Веб-приложения (FastAPI, Sanic): обрабатываем 1000+ одновременных клиентов в одном потоке
- Очереди сообщений (RabbitMQ, Kafka)
- Вебсокеты и real-time системы
- Микросервисы с множеством внешних вызовов
Не идеально для:
- CPU-интенсивные операции: используй multiprocessing
- Простые скрипты: усложняет код
Практический пример: миграция с sync на async
# ❌ SYNC: вызвать 1000 API берёт 1000 сек
def fetch_all_sync(user_ids):
results = []
for uid in user_ids:
response = requests.get(f'https://api.example.com/users/{uid}')
results.append(response.json())
return results
# ✅ ASYNC: вызвать 1000 API берёт 1-2 сек
async def fetch_all_async(user_ids):
async with aiohttp.ClientSession() as session:
tasks = [
fetch_url(session, f'https://api.example.com/users/{uid}')
for uid in user_ids
]
return await asyncio.gather(*tasks)
# Ускорение: 1000x раз!
Выводы
Кооперативная многозадачность нужна для:
- Обхода GIL (GIL не блокирует при await)
- Эффективной работы с I/O операциями
- Масштабирования веб-приложений
- Обработки множества одновременных клиентов
Золотые правила:
- Используй
async/awaitдля I/O операций - Используй
multiprocessingдля CPU операций - Всегда
awaitасинхронные функции - Избегай блокирующих операций внутри async функций
- Используй асинхронные версии библиотек (aiohttp вместо requests)
Кооперативная многозадачность — это не просто оптимизация, это новая парадигма, которая делает Python мощным для современных высоконагруженных систем.