← Назад к вопросам

Какие плюсы и минусы кооперативной многозадачности?

2.0 Middle🔥 101 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Кооперативная многозадачность: плюсы и минусы

Кооперативная многозадачность (cooperative multitasking) — это подход, когда задачи добровольно отдают управление другим задачам. Это основа asyncio в Python.

Как работает кооперативная многозадачность

# Вытеснительная многозадачность (OS threads)
# ОС сама выбирает, когда переключить на другой поток
# Может произойти в ЛЮБОЙ момент кода

import threading
import time

def task1():
    print("Task 1 started")
    time.sleep(1)  # ОС может переключить здесь
    print("Task 1 finished")

def task2():
    print("Task 2 started")
    time.sleep(1)  # Или здесь
    print("Task 2 finished")

t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()

# Порядок вывода непредсказуем!
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished
# (или другой порядок)
# Кооперативная многозадачность (asyncio)
# Задача сама решает, когда отдать управление
# Переключение только в точках await

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(1)  # Здесь точка переключения
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)  # И здесь
    print("Task 2 finished")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

# Порядок более предсказуем!
# Task 1 started
# Task 2 started
# Task 1 finished
# Task 2 finished

Плюсы кооперативной многозадачности

1. Нет race conditions

# Проблема с threading
import threading

counter = 0

def increment_thread():
    global counter
    # ОПАСНО: может быть race condition
    for _ in range(1000):
        counter += 1  # Может быть переключение в середине операции

threads = [threading.Thread(target=increment_thread) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # Может быть не 10000!


# Решение: нужны locks
import threading

counter = 0
lock = threading.Lock()

def increment_thread():
    global counter
    for _ in range(1000):
        with lock:
            counter += 1

# Теперь работает, но медленнее из-за locks
# Asyncio - нет race conditions!
import asyncio

counter = 0

async def increment_async():
    global counter
    # БЕЗОПАСНО: нет race conditions
    for _ in range(1000):
        counter += 1  # Переключение только на await
        # Эта строка выполнится полностью, без перерыва

async def main():
    await asyncio.gather(
        increment_async(),
        increment_async(),
        increment_async(),
        increment_async(),
        increment_async(),
        increment_async(),
        increment_async(),
        increment_async(),
        increment_async(),
        increment_async()
    )
    return counter

result = asyncio.run(main())
print(result)  # Всегда 10000!

2. Меньше памяти

# Каждый OS thread занимает ~1MB памяти
import threading
import sys

def dummy():
    pass

threads = []
for i in range(10):
    t = threading.Thread(target=dummy)
    threads.append(t)
    t.start()

print(f"Memory per thread: ~1MB")
print(f"10 threads: ~10MB")
print(f"1000 threads: ~1GB!")

# Asyncio coroutines занимают ~1KB
# 10000 coroutines = ~10MB (вместо 10GB для threads!)

3. Нет context switch overhead

# Threading: частые переключения контекста = медленно
import threading
import time

def io_task():
    time.sleep(0.1)  # 100ms I/O операция

start = time.time()
threads = [threading.Thread(target=io_task) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Threading: {time.time() - start:.2f}s")  # ~0.1s (параллельно)
# Но с контекст-свичинг overhead


# Asyncio: минимальный overhead
import asyncio

async def io_task_async():
    await asyncio.sleep(0.1)

async def main():
    await asyncio.gather(*[io_task_async() for _ in range(10)])

start = time.time()
asyncio.run(main())
print(f"Asyncio: {time.time() - start:.2f}s")  # ~0.1s быстрее

4. Проще отлаживать

# Threading - может быть в любом порядке
data = []

def append_data():
    # КОГДА ОС переключит поток? Неизвестно!
    data.append('A')
    data.append('B')
    data.append('C')

# Может быть: ['A', 'B', 'C'] или ['A', 'B']
# или другие комбинации


# Asyncio - детерминировано
data = []

async def append_data_async():
    # Выполнится без перерыва до await
    data.append('A')
    data.append('B')
    await asyncio.sleep(0)  # Только здесь переключение
    data.append('C')

# Всегда: ['A', 'B', 'C']

Минусы кооперативной многозадачности

1. Может быть starving (голодание задач)

# Если одна coroutine долго выполняется без await,
# другие могут не запуститься

import asyncio

async def slow_task():
    # ПЛОХО: долгие вычисления без await
    total = 0
    for i in range(100_000_000):
        total += i
    # Другие задачи ждут!
    return total

async def fast_task():
    print("Fast task started")
    await asyncio.sleep(0)  # Стоп, slow_task не закончилась!
    print("Fast task finished")

async def main():
    await asyncio.gather(slow_task(), fast_task())

asyncio.run(main())
# Fast task может ждать очень долго

2. CPU-bound задачи блокируют event loop

# Решение: запустить CPU-bound в отдельном процессе
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive():
    # Вычисление факториала
    result = 1
    for i in range(1, 10000):
        result *= i
    return result

async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as executor:
        # Запускаем в отдельном процессе
        result = await loop.run_in_executor(executor, cpu_intensive)
        print(result)

asyncio.run(main())

3. Необходимо использовать async везде

# Проблема: если одна библиотека не асинхронная,
# весь код нужно адаптировать

import asyncio
import requests  # Синхронная библиотека

async def fetch_data():
    # ПЛОХО: requests блокирует event loop
    response = requests.get('https://api.example.com/data')  # БЛОКИРУЕТ!
    return response.json()

async def main():
    # Другие coroutines ждут
    result = await fetch_data()


# Решение: использовать асинхронную библиотеку
import aiohttp

async def fetch_data_async():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            return await response.json()

4. Трудно интегрировать с синхронным кодом

# Синхронная функция не может вызвать async функцию
import asyncio

async def get_data():
    await asyncio.sleep(1)
    return 42

def process_data():
    # ОШИБКА: нельзя вызвать async функцию
    data = get_data()  # Это не выполнится!
    return data * 2


# Нужно оборачивать
def process_data_correctly():
    data = asyncio.run(get_data())
    return data * 2

print(process_data_correctly())  # 84

# Но если уже внутри asyncio:
async def main():
    data = await get_data()
    result = process_data()  # Нельзя вызвать синхронную из async!
    # Если она блокирует, весь event loop замёрзнет

5. Сложнее с отладкой в некотором смысле

# Stack trace может быть запутанным
import asyncio

async def level3():
    raise Exception("Error in level3")

async def level2():
    await level3()

async def level1():
    await level2()

async def main():
    await level1()

try:
    asyncio.run(main())
except Exception as e:
    # Stack trace может быть сложным
    import traceback
    traceback.print_exc()

Сравнение многозадачности

АспектThreadingAsyncioMultiprocessing
Race conditionsВысокий рискНетНет
Memory1MB/thread1KB/coroutine50MB/process
Context switchЕсть overheadНетБольшой overhead
I/O задачиХорошоИдеальноПлохо
CPU задачиПлохо (GIL)Плохо (GIL)Хорошо
СложностьСредняяВысокаяСредняя
СтабильностьВысокаяСредняяВысокая

Когда использовать

# 1. Asyncio: I/O-heavy, нужна масштабируемость
async def web_scraper():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# 2. Threading: простые I/O операции, легче интегрировать
def simple_io():
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(request, url) for url in urls]
        return [f.result() for f in futures]

# 3. Multiprocessing: CPU-bound работы
with ProcessPoolExecutor() as executor:
    results = executor.map(cpu_intensive, data_chunks)

Вывод: Кооперативная многозадачность (asyncio) отлична для I/O, но требует дисциплины и понимания event loop механики. Для простых случаев threading может быть проще.