← Назад к вопросам
Почему на потоках не работают с CPU bound нагрузкой в Python?
2.0 Middle🔥 201 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Почему потоки не работают с CPU-bound нагрузкой в Python
Ответ: из-за GIL (Global Interpreter Lock) — мьютекса, который позволяет только одному потоку выполнять Python код в один момент времени.
Это означает, что потоки не могут выполняться параллельно для вычислений, только по очереди.
Визуализация: потоки vs процессы
ОДНОПОТОЧНОЕ ВЫПОЛНЕНИЕ (1 секунда):
[Thread 1: вычисление] [finished]
Время: 1 сек
ДВА ПОТОКА БЕЗ GIL (0.5 секунды):
Thread 1: [вычисление | вычисление]
Thread 2: [вычисление | вычисление]
Время: 0.5 сек (истинный параллелизм)
ДВА ПОТОКА С GIL В PYTHON (1+ секунды - МЕДЛЕННЕЕ!):
Thread 1: [вычисл] [ждёт] [вычисл] [ждёт]
Thread 2: [ждёт] [вычисл] [ждёт] [вычисл]
Время: 1+ сек (сериализовано, плюс overhead переключения)
ДВА ПРОЦЕССА (0.5 секунды):
Process 1: [вычисление | вычисление]
Process 2: [вычисление | вычисление]
Время: 0.5 сек (истинный параллелизм, но тяжелее)
Практический пример с измерением
import time
import threading
from multiprocessing import Process
def cpu_bound_task(n: int) -> int:
"""CPU-интенсивная операция"""
total = 0
for i in range(n):
total += i ** 2
return total
WORK_SIZE = 50_000_000
# 1. ОДНОПОТОЧНОЕ ВЫПОЛНЕНИЕ
print("1. Однопоточное:")
start = time.time()
cpu_bound_task(WORK_SIZE)
cpu_bound_task(WORK_SIZE)
time1 = time.time() - start
print(f" Время: {time1:.2f}s\n")
# 2. ПОТОКИ (медленнее из-за GIL!)
print("2. Потоки (threading):")
start = time.time()
t1 = threading.Thread(target=cpu_bound_task, args=(WORK_SIZE,))
t2 = threading.Thread(target=cpu_bound_task, args=(WORK_SIZE,))
t1.start()
t2.start()
t1.join()
t2.join()
time2 = time.time() - start
print(f" Время: {time2:.2f}s")
print(f" Медленнее в {time2/time1:.2f}x раз! (из-за GIL)\n")
# 3. ПРОЦЕССЫ (правильно для CPU-bound)
print("3. Процессы (multiprocessing):")
start = time.time()
p1 = Process(target=cpu_bound_task, args=(WORK_SIZE,))
p2 = Process(target=cpu_bound_task, args=(WORK_SIZE,))
p1.start()
p2.start()
p1.join()
p2.join()
time3 = time.time() - start
print(f" Время: {time3:.2f}s")
print(f" Ускорение vs однопоточное: {time1/time3:.2f}x раз")
# Output:
# 1. Однопоточное:
# Время: 6.50s
#
# 2. Потоки (threading):
# Время: 7.20s
# Медленнее в 1.11x раз! (из-за GIL)
#
# 3. Процессы (multiprocessing):
# Время: 3.40s
# Ускорение vs однопоточное: 1.91x раз
Что такое GIL (Global Interpreter Lock)?
GIL — это мьютекс (lock), который защищает доступ к объектам Python:
# Внутри CPython (интерпретатор на C):
# Только один поток может владеть GIL одновременно
Thread 1:
acquire_gil() # Захватываю GIL
x = x + 1 # Только я могу выполнять код
release_gil() # Отпускаю GIL
Thread 2:
# Ждёт GIL
# Ждёт GIL
# Ждёт GIL
acquire_gil() # Теперь я владею GIL
y = y + 2
release_gil()
Почему существует GIL?
Причина: управление памятью
# Python использует reference counting для garbage collection
class MyObject:
pass
obj = MyObject() # ref_count = 1
ref1 = obj # ref_count = 2
ref2 = obj # ref_count = 3
del ref1 # ref_count = 2
# Если нет GIL, и два потока одновременно удалят ссылки:
# Thread 1: ref_count = 3 -> 2
# Thread 2: ref_count = 2 -> 1 (в то же время)
# Результат: ref_count = 1 вместо правильного 1
# Или даже: ref_count = 0 -> объект удалится, но ref2 ещё его использует!
#
# GIL предотвращает это
Когда GIL отпускает контроль?
import threading
import time
def io_operation():
"""I/O операция — GIL отпускается!"""
time.sleep(1) # GIL освобождается на время sleep
print("Готово")
# GIL отпускается при I/O:
# - requests.get() (сетевой запрос)
# - open().read() (чтение файла)
# - time.sleep() (ожидание)
# GIL НЕ отпускается при:
# - for i in range(100_000_000): x += i
# - json.dumps(huge_dict)
# - вычисления в pure Python
I/O-bound vs CPU-bound
# I/O-BOUND (потоки работают хорошо)
print("I/O-bound: потоки эффективны")
def fetch_url(url: str):
import requests
response = requests.get(url) # GIL отпускается здесь!
return response.status_code
from concurrent.futures import ThreadPoolExecutor
import time
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_url, [
'https://example.com',
'https://google.com',
# ... ещё 8 URL
]))
print(f"I/O с потоками: {time.time() - start:.2f}s") # ~1-2 сек
# CPU-BOUND (процессы работают хорошо)
print("\nCPU-bound: процессы эффективны")
def cpu_task(n: int) -> int:
total = 0
for i in range(n): # GIL НЕ отпускается!
total += i
return total
from multiprocessing import Pool
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(cpu_task, [10_000_000] * 4)
print(f"CPU с процессами: {time.time() - start:.2f}s") # ~0.5 сек
Решения для CPU-bound нагрузки
1. Multiprocessing (отдельные процессы)
from multiprocessing import Pool
import time
def heavy_computation(n: int) -> int:
total = 0
for i in range(n):
total += i ** 2
return total
if __name__ == '__main__':
start = time.time()
# Создаём пул из 4 процессов
with Pool(processes=4) as pool:
results = pool.map(heavy_computation, [50_000_000] * 4)
print(f"Время: {time.time() - start:.2f}s")
print(f"Результаты: {results}")
2. Cython или NumPy (уходят из GIL)
# МЕДЛЕННЫЙ Pure Python
result = sum(i ** 2 for i in range(1_000_000)) # ~100ms
# БЫСТРЫЙ NumPy (не использует GIL)
import numpy as np
result = np.sum(np.arange(1_000_000) ** 2) # ~1ms
# NumPy вычисления на C-уровне (GIL отпускается)
3. Распределённая обработка (Celery, Ray)
# Celery для распределённой работы
from celery import Celery
app = Celery('tasks')
@app.task
def cpu_intensive_task(n: int) -> int:
total = 0
for i in range(n):
total += i ** 2
return total
# В процессе, где есть Celery worker:
result = cpu_intensive_task.delay(50_000_000)
print(result.get())
# Ray для параллельных вычислений
import ray
@ray.remote
def compute(n: int) -> int:
total = 0
for i in range(n):
total += i ** 2
return total
ray.init()
results = ray.get([
compute.remote(50_000_000),
compute.remote(50_000_000),
])
ray.shutdown()
4. Асинхронность для I/O (asyncio)
import asyncio
import aiohttp
async def fetch_urls(urls: list) -> list:
"""Асинхронные HTTP запросы"""
async with aiohttp.ClientSession() as session:
tasks = [
session.get(url)
for url in urls
]
return await asyncio.gather(*tasks)
# Работает быстро потому что I/O
await fetch_urls([
'https://example.com',
'https://google.com',
])
Таблица: когда что использовать
+------------------+----------+--------+--------+----------+
| Тип задачи | threading| Process| asyncio| NumPy |
+------------------+----------+--------+--------+----------+
| I/O (сеть, файл) | ✅ хорошо | ⚠️ OK | ✅ ОК | - |
| CPU-bound | ❌ ПЛОХО | ✅ ОК | ❌ не помогает | ✅ ОК |
| Mixed | ⚠️ OK | ✅ ОК | ⚠️ OK | ✅ ОК |
| Простота | ✅ | ⚠️ | ⚠️ | ⚠️ |
+------------------+----------+--------+--------+----------+
Пример: правильный выбор
# ЗАДАЧА 1: Загрузить 100 URL
# РЕШЕНИЕ: потоки (I/O-bound)
from concurrent.futures import ThreadPoolExecutor
import requests
with ThreadPoolExecutor(max_workers=10) as executor:
responses = executor.map(requests.get, urls)
# ЗАДАЧА 2: Обработать 1 млн записей из БД
# РЕШЕНИЕ: NumPy или multiprocessing
import numpy as np
data = np.array(list(db.fetch_all()))
result = np.mean(data) # Быстро!
# ЗАДАЧА 3: Парсить 10 ГБ текста
# РЕШЕНИЕ: multiprocessing
from multiprocessing import Pool
with Pool(processes=4) as pool:
results = pool.map(parse_chunk, chunks)
Вывод
Потоки в Python НЕ работают с CPU-bound нагрузкой потому что:
- GIL позволяет только одному потоку выполнять Python код одновременно
- Потоки выполняются по очереди, а не параллельно
- Overhead переключения между потоками делает это медленнее однопоточного кода
Для CPU-bound используй:
- multiprocessing (отдельные процессы)
- NumPy/Cython (C-уровневые вычисления)
- Celery/Ray (распределённая обработка)
Для I/O-bound используй:
- threading (простые сетевые запросы)
- asyncio (асинхронные операции)
- requests + ThreadPoolExecutor