← Назад к вопросам

Можно ли на потоках распараллелить задачу?

2.3 Middle🔥 131 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Можно ли на потоках распараллелить задачу?

В Python ответ сложнее, чем просто «да» или «нет». Всё зависит от типа задачи и особенности GIL (Global Interpreter Lock).

Суть проблемы: GIL

Global Interpreter Lock — это мьютекс в CPython, который позволяет только одному потоку выполнять байт-код одновременно:

import threading
import time

def cpu_bound_task(n):
    """Вычисления — привязано к CPU"""
    result = 0
    for i in range(n):
        result += i
    return result

def io_bound_task(delay):
    """I/O операция — ждёт ответа"""
    time.sleep(delay)
    return "Done"

# ❌ Потоки НЕ помогут для CPU-bound
start = time.time()
threads = []
for i in range(4):
    t = threading.Thread(target=cpu_bound_task, args=(100_000_000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"4 потока для CPU: {time.time() - start:.2f}s")  # ~20 сек (медленнее!)

# ✅ Потоки помогут для I/O-bound
start = time.time()
threads = []
for i in range(4):
    t = threading.Thread(target=io_bound_task, args=(1,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"4 потока для I/O: {time.time() - start:.2f}s")  # ~1 сек (быстро!)

1. Потоки для I/O-bound задач (эффективно)

Это хороший случай для потоков:

import threading
import requests
from concurrent.futures import ThreadPoolExecutor

class DataFetcher:
    def fetch_url(self, url):
        """Блокирующий I/O запрос"""
        response = requests.get(url, timeout=5)
        return response.json()
    
    def fetch_multiple_threads(self, urls):
        """Параллельно с потоками"""
        start = time.time()
        
        with ThreadPoolExecutor(max_workers=4) as executor:
            results = list(executor.map(self.fetch_url, urls))
        
        print(f"Потоки: {time.time() - start:.2f}s")
        return results

fetcher = DataFetcher()
urls = [
    'https://api.github.com/users/github',
    'https://api.github.com/users/google',
    'https://api.github.com/users/microsoft',
    'https://api.github.com/users/apple',
]

results = fetcher.fetch_multiple_threads(urls)
# Время: ~1 сек (4 запроса параллельно вместо 4сек последовательно)

2. Потоки НЕ помогут для CPU-bound (неэффективно)

Для вычислений нужны процессы:

import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time

def cpu_bound(n):
    """Интенсивные вычисления"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

data = [100_000_000] * 4

# ❌ Потоки — медленнее (GIL блокирует)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_bound, data))
thread_time = time.time() - start
print(f"ThreadPool: {thread_time:.2f}s")  # ~25 сек

# ✅ Процессы — быстро (разные GIL для каждого процесса)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_bound, data))
process_time = time.time() - start
print(f"ProcessPool: {process_time:.2f}s")  # ~7 сек (3.5x быстрее!)

3. AsyncIO вместо потоков (ещё лучше для I/O)

Асинхронный код часто эффективнее потоков:

import asyncio
import aiohttp
from time import time

class AsyncFetcher:
    async def fetch_url(self, session, url):
        """Асинхронный запрос"""
        async with session.get(url) as response:
            return await response.json()
    
    async def fetch_multiple_async(self, urls):
        """Асинхронно с asyncio"""
        start = time()
        
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        
        print(f"AsyncIO: {time() - start:.2f}s")
        return results

fetcher = AsyncFetcher()
urls = [
    'https://api.github.com/users/github',
    'https://api.github.com/users/google',
    'https://api.github.com/users/microsoft',
]

# Скорость почти такая же как потоки, но меньше overhead
asyncio.run(fetcher.fetch_multiple_async(urls))
# Время: ~1 сек

4. Практический пример: веб-скрейпинг

Сравним все три подхода:

import threading
import multiprocessing
import asyncio
import requests
import aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import time

urls = [f'https://httpbin.org/delay/1' for _ in range(8)]

# 1. Последовательно (baseline)
def sequential():
    start = time()
    for url in urls:
        requests.get(url)
    return time() - start

# 2. Потоки (хорошо для I/O)
def with_threads():
    start = time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        list(executor.map(requests.get, urls))
    return time() - start

# 3. AsyncIO (лучше для I/O)
async def with_asyncio():
    start = time()
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*[
            session.get(url) for url in urls
        ])
    return time() - start

print(f"Последовательно: {sequential():.2f}s")  # ~8 сек
print(f"Потоки: {with_threads():.2f}s")  # ~2 сек (4x быстрее)
print(f"AsyncIO: {asyncio.run(with_asyncio()):.2f}s")  # ~2 сек (4x быстрее)

5. Когда потоки работают хорошо

# ✅ Хорошие примеры для потоков
from concurrent.futures import ThreadPoolExecutor

# 1. Database queries
with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(db_query, user_ids)

# 2. File I/O
with ThreadPoolExecutor(max_workers=4) as executor:
    files = executor.map(read_file, file_paths)

# 3. Network requests
with ThreadPoolExecutor(max_workers=10) as executor:
    responses = executor.map(requests.get, urls)

# 4. API calls
with ThreadPoolExecutor(max_workers=8) as executor:
    results = executor.map(call_external_api, items)

6. Когда потоки НЕ работают

# ❌ Плохие примеры для потоков
from concurrent.futures import ProcessPoolExecutor

def matrix_multiply(matrix):
    """CPU-bound: нужны процессы"""
    return matrix @ matrix

def calculate_primes(n):
    """CPU-bound: нужны процессы"""
    primes = []
    for num in range(2, n):
        if all(num % i != 0 for i in range(2, num)):
            primes.append(num)
    return primes

# Используем ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(calculate_primes, [100000] * 4)

7. Race Conditions при работе с потоками

Нужно защищать общие данные:

import threading

# ❌ Race condition
class Counter:
    def __init__(self):
        self.value = 0
    
    def increment(self):
        # Не атомарная операция!
        temp = self.value
        temp += 1
        self.value = temp

counter = Counter()
threads = []
for _ in range(4):
    t = threading.Thread(target=lambda: [
        counter.increment() for _ in range(100_000)
    ])
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(counter.value)  # Может быть < 400_000 (race condition!)

# ✅ Защита с Lock
class SafeCounter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:
            self.value += 1

counter = SafeCounter()
threads = []
for _ in range(4):
    t = threading.Thread(target=lambda: [
        counter.increment() for _ in range(100_000)
    ])
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(counter.value)  # Всегда 400_000

Рекомендуемые паттерны

# 1. I/O-bound + простые запросы
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(requests.get, urls)

# 2. I/O-bound + async
import asyncio
await asyncio.gather(*tasks)

# 3. CPU-bound
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(expensive_calculation, data)

# 4. Смешанные задачи
# Используй ThreadPoolExecutor для I/O блоков
# Используй ProcessPoolExecutor для CPU блоков

Итоговый ответ

Да, но с оговорками:

  • ✅ Потоки хорошо работают для I/O-bound задач (сеть, БД, файлы)
  • ❌ Потоки НЕ работают для CPU-bound задач (вычисления) из-за GIL
  • ⚡ AsyncIO ещё лучше для I/O чем потоки
  • ⚙️ Процессы нужны для CPU-bound параллелизма

Правило: если задача ждёт ввода-вывода (I/O) — используй потоки. Если вычисления (CPU) — используй процессы.