← Назад к вопросам

Что знаешь о работе с высокоуровневыми абстракциями, такими как пулы задач?

2.7 Senior🔥 111 комментариев
#Soft Skills#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Пулы задач и высокоуровневые абстракции для параллелизма

Пулы задач — это мощный инструмент для управления параллельным выполнением. За годы разработки я обнаружил, что это одна из самых важных концепций для оптимизации производительности. Давай разберёмся глубоко.

Что такое пул задач (Thread Pool)

Пул задач — это набор рабочих потоков, которые выполняют задачи из очереди.

Вместо создания нового потока для каждой задачи (дорого!), пул переиспользует существующие потоки:

# Плохо — создаёшь новый поток для каждой задачи
import threading
import time

def slow_task(n):
    time.sleep(1)
    return n ** 2

# Создаём новый поток для КАЖДОЙ задачи
threads = []
for i in range(100):
    t = threading.Thread(target=slow_task, args=(i,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()
# Проблема: создание/удаление 100 потоков дорого!
# Контекст-свитчинг замедляет всё
# Хорошо — используешь пул из 4-8 рабочих потоков
from concurrent.futures import ThreadPoolExecutor
import time

def slow_task(n):
    time.sleep(1)
    return n ** 2

# Пул из 4 рабочих потоков
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(slow_task, range(100)))
    # 100 задач обрабатываются в пуле из 4 потоков
    # Очень эффективно!

print(results)

Типы пулов в Python

1. ThreadPoolExecutor — для I/O операций

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url):
    """Загружает URL"""
    response = requests.get(url, timeout=10)
    return response.status_code

urls = [
    'https://api.github.com/users/torvalds',
    'https://api.github.com/users/gvanrossum',
    'https://api.github.com/users/guido',
    # ... ещё 100 URL
]

# Параллельно загружаем все URL
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(fetch_url, urls))
    print(f"Загружено {sum(1 for r in results if r == 200)} успешно")

# Без пула это заняло бы 100+ секунд
# С пулом из 10 потоков займёт ~10 секунд

2. ProcessPoolExecutor — для CPU-интенсивных операций

from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n):
    """Проверяет, простое ли число (CPU-интенсивная операция)"""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

numbers = range(1000000, 1001000)

# ProcessPoolExecutor — для CPU работы
# Потоки не помогут из-за GIL
with ProcessPoolExecutor(max_workers=4) as executor:
    primes = list(executor.map(is_prime, numbers))
    count = sum(primes)
    print(f"Найдено {count} простых чисел")

# Каждый процесс имеет свой GIL
# Настоящий параллелизм на многоядерных системах

3. asyncio — для асинхронных операций

import asyncio
import aiohttp

async def fetch_url(session, url):
    """Асинхронно загружает URL"""
    async with session.get(url, timeout=10) as response:
        return await response.text()

async def main():
    urls = [
        'https://api.github.com/users/torvalds',
        'https://api.github.com/users/gvanrossum',
        # ... ещё 100 URL
    ]
    
    # Асинхронный пул — внутри 1 поток, но 100 операций "одновременно"
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

# asyncio очень быстро для I/O
await main()

Когда использовать что

choices = {
    "ThreadPoolExecutor": {
        "случаи": [
            "API запросы (HTTP клиент)",
            "Чтение файлов",
            "Работа с БД",
            "Сетевые операции"
        ],
        "плюсы": ["Просто", "Из коробки", "Блокирующие библиотеки"],
        "минусы": ["GIL ограничивает CPU", "Затраты на потоки"]
    },
    "ProcessPoolExecutor": {
        "случаи": [
            "Обработка изображений",
            "Научные вычисления",
            "Парсинг больших файлов",
            "Криптография"
        ],
        "плюсы": ["Настоящий параллелизм", "Обходит GIL"],
        "минусы": ["Медленнее потоков", "Сложнее с памятью", "IPC overhead"]
    },
    "asyncio": {
        "случаи": [
            "Веб-приложения (FastAPI, aiohttp)",
            "Множество I/O операций",
            "Real-time приложения",
            "WebSockets"
        ],
        "плюсы": ["Очень быстро", "Низкие затраты", "Масштабируемо"],
        "минусы": ["Нужна асинхронная библиотека", "Сложнее отладка"]
    }
}

Продвинутые паттерны

Паттерн 1: Map-Reduce с пулом

from concurrent.futures import ThreadPoolExecutor

def process_chunk(chunk):
    """Обрабатывает часть данных"""
    return sum(x ** 2 for x in chunk)

def map_reduce(data, num_workers=4, chunk_size=1000):
    """Распределённая обработка данных"""
    # Делим на части
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
    
    # Map — обрабатываем в пуле
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        results = list(executor.map(process_chunk, chunks))
    
    # Reduce — суммируем
    return sum(results)

data = range(1000000)
result = map_reduce(data)
print(f"Сумма квадратов: {result}")

Паттерн 2: Асинхронный пул с retry

import asyncio
from aiohttp import ClientSession, ClientConnectorError

class AsyncPool:
    def __init__(self, max_workers=10):
        self.max_workers = max_workers
        self.semaphore = asyncio.Semaphore(max_workers)
    
    async def fetch_with_retry(self, session, url, retries=3):
        """Загружает с retry"""
        for attempt in range(retries):
            try:
                async with self.semaphore:
                    async with session.get(url, timeout=10) as response:
                        return await response.text()
            except ClientConnectorError:
                if attempt == retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)
    
    async def fetch_all(self, urls):
        """Загружает все URL с лимитом"""
        async with ClientSession() as session:
            tasks = [self.fetch_with_retry(session, url) for url in urls]
            return await asyncio.gather(*tasks, return_exceptions=True)

# Использование
pool = AsyncPool(max_workers=10)
results = await pool.fetch_all(urls)

Производительность

import time

# Сравнение подходов для 100 операций по 1 сек каждая

# Последовательно: 100 секунд
start = time.time()
for i in range(100):
    time.sleep(1)  # I/O операция
sequential_time = time.time() - start  # ~100s

# ThreadPool(10): 10 секунд
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    list(executor.map(lambda x: time.sleep(1), range(100)))
thread_pool_time = time.time() - start  # ~10s

# asyncio: 10 секунд
start = time.time()
async def async_ops():
    tasks = [asyncio.sleep(1) for _ in range(100)]
    await asyncio.gather(*tasks)

asyncio.run(async_ops())
async_time = time.time() - start  # ~10s

print(f"Последовательно: {sequential_time:.1f}s")
print(f"ThreadPool: {thread_pool_time:.1f}s (ускорение {sequential_time/thread_pool_time:.1f}x)")
print(f"asyncio: {async_time:.1f}s (ускорение {sequential_time/async_time:.1f}x)")

Важные моменты

1. Не переусложняй — начни с ThreadPoolExecutor:

# Для 90% случаев этого достаточно
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
    results = list(executor.map(task, items))

2. Размер пула зависит от типа работы:

import os

# Для I/O: значительно больше ядер (10-100)
for_io = min(32, os.cpu_count() * 5)

# Для CPU: равно числу ядер
for_cpu = os.cpu_count() or 4

3. Мониторь и профилируй:

import cProfile

with ThreadPoolExecutor(max_workers=8) as executor:
    cProfile.run('list(executor.map(slow_task, range(1000)))')
    # Смотри результаты — может быть контекст-свитчинг?

Выводы

Пулы задач — это неотъемлемая часть масштабируемых приложений. Выбор правильного типа пула может ускорить приложение в 10-100 раз:

ThreadPoolExecutor для I/O-bound операций ✅ ProcessPoolExecutor для CPU-bound операций
asyncio для веб-приложений и масштабируемых систем

В production приложениях я часто использую комбинацию: asyncio для веб-фреймворка + ThreadPoolExecutor для блокирующих операций, которые нельзя переписать.