← Назад к вопросам
Какие знаешь библиотеки для работы с многопоточностью?
2.0 Middle🔥 171 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Библиотеки для работы с многопоточностью в Python
Стандартная библиотека threading
Описание: Встроенная библиотека для создания и управления потоками.
import threading
import time
def worker(name, delay):
time.sleep(delay)
print(f"Worker {name} finished")
# Создаем потоки
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"T{i}", 1))
t.start()
threads.append(t)
# Ждем завершения всех потоков
for t in threads:
t.join()
print("All threads completed")
Плюсы:
- Встроена в стандартную библиотеку
- Простая и прямолинейная
- Достаточна для большинства задач
Минусы:
- GIL ограничивает реальный параллелизм для CPU-bound задач
- Нет встроенного пула потоков
- Нужно вручную синхронизировать потоки
concurrent.futures.ThreadPoolExecutor
Описание: Пул потоков с удобным API для отправки задач.
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def download_url(url, timeout=5):
return requests.get(url, timeout=timeout).text
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
# Вариант 1: map() - сохраняет порядок
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(download_url, urls))
for url, html in zip(urls, results):
print(f"{url}: {len(html)} bytes")
# Вариант 2: as_completed() - обрабатывает по готовности
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(download_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
html = future.result()
print(f"{url}: {len(html)} bytes")
except Exception as e:
print(f"Error downloading {url}: {e}")
Плюсы:
- Простой и удобный API
- Автоматическое управление потоками
- Встроена в стандартную библиотеку
Минусы:
- Все равно ограничена GIL для CPU-bound задач
- Нет поддержки приоритетов задач
concurrent.futures.ProcessPoolExecutor
Описание: Пул процессов для CPU-bound задач (обходит GIL).
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def cpu_intensive(n):
"""Вычисляем факториал - CPU-bound задача"""
result = 1
for i in range(1, n + 1):
result *= i
return result
if __name__ == "__main__":
numbers = [1000, 2000, 3000, 4000]
# ProcessPoolExecutor обходит GIL
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_intensive, numbers))
for num, result in zip(numbers, results):
print(f"factorial({num}) = {result}")
Плюсы:
- Обходит GIL - реальный параллелизм
- Использует все ядра процессора
- Простой API как у ThreadPoolExecutor
Минусы:
- Высокие затраты на создание процесса
- Нельзя делиться памятью между процессами напрямую
- Требует if name == "main" для Windows
Multiprocessing
Описание: Встроенная библиотека для работы с процессами.
from multiprocessing import Process, Pool, Queue
import time
# Вариант 1: Отдельные процессы
def worker(name, queue):
for i in range(3):
queue.put(f"{name}: {i}")
time.sleep(0.5)
if __name__ == "__main__":
q = Queue()
processes = [
Process(target=worker, args=(f"P{i}", q))
for i in range(2)
]
for p in processes:
p.start()
for _ in range(6):
print(q.get())
for p in processes:
p.join()
# Вариант 2: Пул процессов
if __name__ == "__main__":
with Pool(processes=4) as pool:
results = pool.map(cpu_intensive, [100, 200, 300])
print(results)
Плюсы:
- Полный контроль над процессами
- Обходит GIL
- Поддержка очередей для синхронизации
Минусы:
- Больше кода для управления
- Высокие затраты на создание процесса
- Сложнее отлаживать
asyncio - асинхронное программирование
Описание: Встроенная библиотека для асинхронного I/O в одном потоке.
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, html in zip(urls, results):
print(f"{url}: {len(html)} bytes")
# Запуск
asyncio.run(main())
Плюсы:
- Очень высокая производительность для I/O
- Можно обработать тысячи соединений в одном потоке
- Встроена в стандартную библиотеку
- Низкое потребление памяти
Минусы:
- Требует переписания кода (async/await)
- Не обходит GIL для CPU-bound задач
- Кривая обучения
APScheduler
Описание: Библиотека для планирования и выполнения задач в фоне.
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
def my_task():
print(f"Task executed at {time.time()}")
scheduler = BackgroundScheduler()
# Выполнить каждые 5 секунд
scheduler.add_job(my_task, "interval", seconds=5)
# Выполнить каждый день в 12:00
scheduler.add_job(
my_task,
CronTrigger(hour=12, minute=0)
)
scheduler.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
Плюсы:
- Гибкое планирование (cron, interval, at)
- Работает с потоками и процессами
- Персистенция задач
Минусы:
- Требует настройки
- Не распределенное (одна машина)
Celery
Описание: Распределенная система управления задачами.
from celery import Celery
from celery.result import AsyncResult
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def long_running_task(x, y):
time.sleep(10)
return x + y
@app.task
def send_email(email, subject):
# Отправляем письмо асинхронно
pass
# В основном приложении
result = long_running_task.delay(5, 10) # Асинхронно
print(result.id) # Получить ID задачи
# Проверить статус
result = AsyncResult("task_id")
print(result.status) # "PENDING", "SUCCESS", "FAILURE"
print(result.result) # Результат
# Отправить письмо
send_email.delay("user@example.com", "Hello")
Плюсы:
- Распределенные задачи на нескольких машинах
- Очереди задач (приоритеты, отложенное выполнение)
- Retry логика
- Мониторинг (Flower)
Минусы:
- Требует message broker (Redis, RabbitMQ)
- Сложнее настраивать
- Overhead для простых задач
Выбор инструмента
| Случай | Инструмент |
|---|---|
| I/O-bound (сеть, файлы) | asyncio или ThreadPoolExecutor |
| CPU-bound (вычисления) | ProcessPoolExecutor или multiprocessing |
| Планирование задач | APScheduler |
| Распределенные задачи | Celery |
| Простые потоки | threading |
| Пул потоков | ThreadPoolExecutor |
Сравнение производительности
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import aiohttp
# I/O bound: async лучше всех
async def io_bound_async():
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
await asyncio.gather(*tasks)
# CPU bound: Process лучше Thread
def cpu_bound_process():
with ProcessPoolExecutor(max_workers=4) as executor:
list(executor.map(cpu_intensive, numbers))
# Результаты (примерно):
# async I/O: 0.5 сек (1000 запросов)
# thread I/O: 2.0 сек (1000 запросов)
# async CPU: 10 сек (GIL все равно мешает)
# thread CPU: 10 сек (GIL мешает)
# process CPU: 2.5 сек (обходит GIL, 4 ядра)
Выбор зависит от типа задачи и требований к производительности.