← Назад к вопросам
Можно ли на потоках распараллелить задачу?
2.3 Middle🔥 131 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Можно ли на потоках распараллелить задачу?
В Python ответ сложнее, чем просто «да» или «нет». Всё зависит от типа задачи и особенности GIL (Global Interpreter Lock).
Суть проблемы: GIL
Global Interpreter Lock — это мьютекс в CPython, который позволяет только одному потоку выполнять байт-код одновременно:
import threading
import time
def cpu_bound_task(n):
"""Вычисления — привязано к CPU"""
result = 0
for i in range(n):
result += i
return result
def io_bound_task(delay):
"""I/O операция — ждёт ответа"""
time.sleep(delay)
return "Done"
# ❌ Потоки НЕ помогут для CPU-bound
start = time.time()
threads = []
for i in range(4):
t = threading.Thread(target=cpu_bound_task, args=(100_000_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"4 потока для CPU: {time.time() - start:.2f}s") # ~20 сек (медленнее!)
# ✅ Потоки помогут для I/O-bound
start = time.time()
threads = []
for i in range(4):
t = threading.Thread(target=io_bound_task, args=(1,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"4 потока для I/O: {time.time() - start:.2f}s") # ~1 сек (быстро!)
1. Потоки для I/O-bound задач (эффективно)
Это хороший случай для потоков:
import threading
import requests
from concurrent.futures import ThreadPoolExecutor
class DataFetcher:
def fetch_url(self, url):
"""Блокирующий I/O запрос"""
response = requests.get(url, timeout=5)
return response.json()
def fetch_multiple_threads(self, urls):
"""Параллельно с потоками"""
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(self.fetch_url, urls))
print(f"Потоки: {time.time() - start:.2f}s")
return results
fetcher = DataFetcher()
urls = [
'https://api.github.com/users/github',
'https://api.github.com/users/google',
'https://api.github.com/users/microsoft',
'https://api.github.com/users/apple',
]
results = fetcher.fetch_multiple_threads(urls)
# Время: ~1 сек (4 запроса параллельно вместо 4сек последовательно)
2. Потоки НЕ помогут для CPU-bound (неэффективно)
Для вычислений нужны процессы:
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
def cpu_bound(n):
"""Интенсивные вычисления"""
result = 0
for i in range(n):
result += i ** 2
return result
data = [100_000_000] * 4
# ❌ Потоки — медленнее (GIL блокирует)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound, data))
thread_time = time.time() - start
print(f"ThreadPool: {thread_time:.2f}s") # ~25 сек
# ✅ Процессы — быстро (разные GIL для каждого процесса)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound, data))
process_time = time.time() - start
print(f"ProcessPool: {process_time:.2f}s") # ~7 сек (3.5x быстрее!)
3. AsyncIO вместо потоков (ещё лучше для I/O)
Асинхронный код часто эффективнее потоков:
import asyncio
import aiohttp
from time import time
class AsyncFetcher:
async def fetch_url(self, session, url):
"""Асинхронный запрос"""
async with session.get(url) as response:
return await response.json()
async def fetch_multiple_async(self, urls):
"""Асинхронно с asyncio"""
start = time()
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"AsyncIO: {time() - start:.2f}s")
return results
fetcher = AsyncFetcher()
urls = [
'https://api.github.com/users/github',
'https://api.github.com/users/google',
'https://api.github.com/users/microsoft',
]
# Скорость почти такая же как потоки, но меньше overhead
asyncio.run(fetcher.fetch_multiple_async(urls))
# Время: ~1 сек
4. Практический пример: веб-скрейпинг
Сравним все три подхода:
import threading
import multiprocessing
import asyncio
import requests
import aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import time
urls = [f'https://httpbin.org/delay/1' for _ in range(8)]
# 1. Последовательно (baseline)
def sequential():
start = time()
for url in urls:
requests.get(url)
return time() - start
# 2. Потоки (хорошо для I/O)
def with_threads():
start = time()
with ThreadPoolExecutor(max_workers=4) as executor:
list(executor.map(requests.get, urls))
return time() - start
# 3. AsyncIO (лучше для I/O)
async def with_asyncio():
start = time()
async with aiohttp.ClientSession() as session:
await asyncio.gather(*[
session.get(url) for url in urls
])
return time() - start
print(f"Последовательно: {sequential():.2f}s") # ~8 сек
print(f"Потоки: {with_threads():.2f}s") # ~2 сек (4x быстрее)
print(f"AsyncIO: {asyncio.run(with_asyncio()):.2f}s") # ~2 сек (4x быстрее)
5. Когда потоки работают хорошо
# ✅ Хорошие примеры для потоков
from concurrent.futures import ThreadPoolExecutor
# 1. Database queries
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(db_query, user_ids)
# 2. File I/O
with ThreadPoolExecutor(max_workers=4) as executor:
files = executor.map(read_file, file_paths)
# 3. Network requests
with ThreadPoolExecutor(max_workers=10) as executor:
responses = executor.map(requests.get, urls)
# 4. API calls
with ThreadPoolExecutor(max_workers=8) as executor:
results = executor.map(call_external_api, items)
6. Когда потоки НЕ работают
# ❌ Плохие примеры для потоков
from concurrent.futures import ProcessPoolExecutor
def matrix_multiply(matrix):
"""CPU-bound: нужны процессы"""
return matrix @ matrix
def calculate_primes(n):
"""CPU-bound: нужны процессы"""
primes = []
for num in range(2, n):
if all(num % i != 0 for i in range(2, num)):
primes.append(num)
return primes
# Используем ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(calculate_primes, [100000] * 4)
7. Race Conditions при работе с потоками
Нужно защищать общие данные:
import threading
# ❌ Race condition
class Counter:
def __init__(self):
self.value = 0
def increment(self):
# Не атомарная операция!
temp = self.value
temp += 1
self.value = temp
counter = Counter()
threads = []
for _ in range(4):
t = threading.Thread(target=lambda: [
counter.increment() for _ in range(100_000)
])
threads.append(t)
t.start()
for t in threads:
t.join()
print(counter.value) # Может быть < 400_000 (race condition!)
# ✅ Защита с Lock
class SafeCounter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.value += 1
counter = SafeCounter()
threads = []
for _ in range(4):
t = threading.Thread(target=lambda: [
counter.increment() for _ in range(100_000)
])
threads.append(t)
t.start()
for t in threads:
t.join()
print(counter.value) # Всегда 400_000
Рекомендуемые паттерны
# 1. I/O-bound + простые запросы
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(requests.get, urls)
# 2. I/O-bound + async
import asyncio
await asyncio.gather(*tasks)
# 3. CPU-bound
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(expensive_calculation, data)
# 4. Смешанные задачи
# Используй ThreadPoolExecutor для I/O блоков
# Используй ProcessPoolExecutor для CPU блоков
Итоговый ответ
Да, но с оговорками:
- ✅ Потоки хорошо работают для I/O-bound задач (сеть, БД, файлы)
- ❌ Потоки НЕ работают для CPU-bound задач (вычисления) из-за GIL
- ⚡ AsyncIO ещё лучше для I/O чем потоки
- ⚙️ Процессы нужны для CPU-bound параллелизма
Правило: если задача ждёт ввода-вывода (I/O) — используй потоки. Если вычисления (CPU) — используй процессы.