← Назад к вопросам

Какие знаешь библиотеки для работы с многопоточностью?

2.0 Middle🔥 171 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Библиотеки для работы с многопоточностью в Python

Стандартная библиотека threading

Описание: Встроенная библиотека для создания и управления потоками.

import threading
import time

def worker(name, delay):
    time.sleep(delay)
    print(f"Worker {name} finished")

# Создаем потоки
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"T{i}", 1))
    t.start()
    threads.append(t)

# Ждем завершения всех потоков
for t in threads:
    t.join()

print("All threads completed")

Плюсы:

  • Встроена в стандартную библиотеку
  • Простая и прямолинейная
  • Достаточна для большинства задач

Минусы:

  • GIL ограничивает реальный параллелизм для CPU-bound задач
  • Нет встроенного пула потоков
  • Нужно вручную синхронизировать потоки

concurrent.futures.ThreadPoolExecutor

Описание: Пул потоков с удобным API для отправки задач.

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def download_url(url, timeout=5):
    return requests.get(url, timeout=timeout).text

urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
]

# Вариант 1: map() - сохраняет порядок
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(download_url, urls))
    for url, html in zip(urls, results):
        print(f"{url}: {len(html)} bytes")

# Вариант 2: as_completed() - обрабатывает по готовности
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(download_url, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        try:
            html = future.result()
            print(f"{url}: {len(html)} bytes")
        except Exception as e:
            print(f"Error downloading {url}: {e}")

Плюсы:

  • Простой и удобный API
  • Автоматическое управление потоками
  • Встроена в стандартную библиотеку

Минусы:

  • Все равно ограничена GIL для CPU-bound задач
  • Нет поддержки приоритетов задач

concurrent.futures.ProcessPoolExecutor

Описание: Пул процессов для CPU-bound задач (обходит GIL).

from concurrent.futures import ProcessPoolExecutor
import multiprocessing

def cpu_intensive(n):
    """Вычисляем факториал - CPU-bound задача"""
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

if __name__ == "__main__":
    numbers = [1000, 2000, 3000, 4000]
    
    # ProcessPoolExecutor обходит GIL
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_intensive, numbers))
        for num, result in zip(numbers, results):
            print(f"factorial({num}) = {result}")

Плюсы:

  • Обходит GIL - реальный параллелизм
  • Использует все ядра процессора
  • Простой API как у ThreadPoolExecutor

Минусы:

  • Высокие затраты на создание процесса
  • Нельзя делиться памятью между процессами напрямую
  • Требует if name == "main" для Windows

Multiprocessing

Описание: Встроенная библиотека для работы с процессами.

from multiprocessing import Process, Pool, Queue
import time

# Вариант 1: Отдельные процессы
def worker(name, queue):
    for i in range(3):
        queue.put(f"{name}: {i}")
        time.sleep(0.5)

if __name__ == "__main__":
    q = Queue()
    
    processes = [
        Process(target=worker, args=(f"P{i}", q))
        for i in range(2)
    ]
    
    for p in processes:
        p.start()
    
    for _ in range(6):
        print(q.get())
    
    for p in processes:
        p.join()

# Вариант 2: Пул процессов
if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(cpu_intensive, [100, 200, 300])
        print(results)

Плюсы:

  • Полный контроль над процессами
  • Обходит GIL
  • Поддержка очередей для синхронизации

Минусы:

  • Больше кода для управления
  • Высокие затраты на создание процесса
  • Сложнее отлаживать

asyncio - асинхронное программирование

Описание: Встроенная библиотека для асинхронного I/O в одном потоке.

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://example.com/1",
        "https://example.com/2",
        "https://example.com/3",
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, html in zip(urls, results):
            print(f"{url}: {len(html)} bytes")

# Запуск
asyncio.run(main())

Плюсы:

  • Очень высокая производительность для I/O
  • Можно обработать тысячи соединений в одном потоке
  • Встроена в стандартную библиотеку
  • Низкое потребление памяти

Минусы:

  • Требует переписания кода (async/await)
  • Не обходит GIL для CPU-bound задач
  • Кривая обучения

APScheduler

Описание: Библиотека для планирования и выполнения задач в фоне.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time

def my_task():
    print(f"Task executed at {time.time()}")

scheduler = BackgroundScheduler()

# Выполнить каждые 5 секунд
scheduler.add_job(my_task, "interval", seconds=5)

# Выполнить каждый день в 12:00
scheduler.add_job(
    my_task,
    CronTrigger(hour=12, minute=0)
)

scheduler.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

Плюсы:

  • Гибкое планирование (cron, interval, at)
  • Работает с потоками и процессами
  • Персистенция задач

Минусы:

  • Требует настройки
  • Не распределенное (одна машина)

Celery

Описание: Распределенная система управления задачами.

from celery import Celery
from celery.result import AsyncResult

app = Celery("tasks", broker="redis://localhost:6379")

@app.task
def long_running_task(x, y):
    time.sleep(10)
    return x + y

@app.task
def send_email(email, subject):
    # Отправляем письмо асинхронно
    pass

# В основном приложении
result = long_running_task.delay(5, 10)  # Асинхронно
print(result.id)  # Получить ID задачи

# Проверить статус
result = AsyncResult("task_id")
print(result.status)  # "PENDING", "SUCCESS", "FAILURE"
print(result.result)  # Результат

# Отправить письмо
send_email.delay("user@example.com", "Hello")

Плюсы:

  • Распределенные задачи на нескольких машинах
  • Очереди задач (приоритеты, отложенное выполнение)
  • Retry логика
  • Мониторинг (Flower)

Минусы:

  • Требует message broker (Redis, RabbitMQ)
  • Сложнее настраивать
  • Overhead для простых задач

Выбор инструмента

СлучайИнструмент
I/O-bound (сеть, файлы)asyncio или ThreadPoolExecutor
CPU-bound (вычисления)ProcessPoolExecutor или multiprocessing
Планирование задачAPScheduler
Распределенные задачиCelery
Простые потокиthreading
Пул потоковThreadPoolExecutor

Сравнение производительности

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import aiohttp

# I/O bound: async лучше всех
async def io_bound_async():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        await asyncio.gather(*tasks)

# CPU bound: Process лучше Thread
def cpu_bound_process():
    with ProcessPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_intensive, numbers))

# Результаты (примерно):
# async I/O:      0.5 сек (1000 запросов)
# thread I/O:     2.0 сек (1000 запросов)
# async CPU:      10 сек (GIL все равно мешает)
# thread CPU:     10 сек (GIL мешает)
# process CPU:    2.5 сек (обходит GIL, 4 ядра)

Выбор зависит от типа задачи и требований к производительности.

Какие знаешь библиотеки для работы с многопоточностью? | PrepBro