← Назад к вопросам

Для каких задач не подходит многопоточность

2.0 Middle🔥 61 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Для каких задач не подходит многопоточность

Многопоточность — мощный инструмент, но это не серебряная пуля. Есть задачи, где потоки создают проблемы вместо решений.

1. CPU-bound задачи в Python (проклятие GIL)

Global Interpreter Lock (GIL) — одна из самых важных особенностей CPython. Это блокировка, которая позволяет только одному потоку одновременно выполнять Python код.

# ❌ ПЛОХО: многопоточность не помогает
import threading
import time

def cpu_intensive(n):
    total = 0
    for i in range(n):
        total += i ** 2
    return total

start = time.time()
threads = []
for _ in range(4):
    t = threading.Thread(target=cpu_intensive, args=(50_000_000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Потоки: {time.time() - start}s")  # ~15 секунд (медленнее, чем без потоков!)

# ✅ ХОРОШО: multiprocessing обходит GIL
import multiprocessing

start = time.time()
with multiprocessing.Pool(4) as pool:
    results = pool.map(cpu_intensive, [50_000_000] * 4)

print(f"Процессы: {time.time() - start}s")  # ~4 секунды (линейное ускорение)

Почему потоки медленнее:

  • GIL приходится захватывать/отпускать на каждой операции
  • Context switching overhead
  • Синхронизация потоков добавляет задержку

Вывод: Для CPU-bound задач в Python используй multiprocessing, не threading.

2. Задачи с состоянием и shared memory

Многопоточность создаёт гонку данных (race conditions), если потоки изменяют общие данные.

# ❌ ПЛОХО: data race, результат непредсказуем
import threading

counter = 0

def increment():
    global counter
    for _ in range(1_000_000):
        counter += 1  # НЕ атомарная операция!

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # Ожидали 4_000_000, получили 2_345_123 (случайный результат)

Почему это происходит:

counter += 1 на самом деле это:
1. Прочитай значение counter
2. Добавь 1
3. Запиши обратно

В многопоточной среде между шагами 1 и 3 другой поток может изменить counter.
Результат: потеря обновлений, corruption данных.

Решения:

# Вариант 1: Lock (Mutex)
import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(1_000_000):
        with lock:
            counter += 1

# Вариант 2: Atomic операции
from threading import Lock
from queue import Queue

# Вариант 3: Избежать shared state вообще
def increment_local():
    total = 0
    for _ in range(1_000_000):
        total += 1
    return total

results = []
threads = [threading.Thread(target=lambda: results.append(increment_local())) for _ in range(4)]
# ... join ...
final_count = sum(results)  # Правильно и безопасно

3. Задачи с сложными зависимостями между потоками

Когда потоки зависят друг от друга, многопоточность создаёт deadlock и livelocks.

# ❌ ПЛОХО: deadlock
import threading

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread_a():
    with lock1:
        print("A: захватил lock1")
        time.sleep(1)
        with lock2:  # Ждёт lock2, который удерживает thread_b
            print("A: захватил lock2")

def thread_b():
    with lock2:
        print("B: захватил lock2")
        time.sleep(1)
        with lock1:  # Ждёт lock1, который удерживает thread_a
            print("B: захватил lock1")

t1 = threading.Thread(target=thread_a)
t2 = threading.Thread(target=thread_b)
t1.start()
t2.start()
t1.join()  # Зависнет навечно (deadlock!)
t2.join()

Решение: Используй async/await вместо потоков для сложных зависимостей.

# ✅ ХОРОШО: async подход (нет deadlock)
import asyncio

async def task_a():
    async with lock1:
        print("A: захватил lock1")
        await asyncio.sleep(1)
    async with lock2:
        print("A: захватил lock2")

async def task_b():
    async with lock2:
        print("B: захватил lock2")
        await asyncio.sleep(1)
    async with lock1:
        print("B: захватил lock1")

asyncio.run(asyncio.gather(task_a(), task_b()))  # Работает без deadlock

4. I/O операции в Python (используй async вместо threads)

Для I/O потоки работают, но async лучше по производительности и коду.

# Плохо: потоки для I/O
import threading
import requests

def fetch_url(url):
    return requests.get(url).json()

threads = []
for url in urls:
    t = threading.Thread(target=fetch_url, args=(url,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# Хорошо: async для I/O
import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        return await asyncio.gather(*[fetch_url(session, url) for url in urls])

asyncio.run(fetch_all(urls))

Почему async лучше:

  • Меньше памяти (потокам нужен stack)
  • Проще отладка (no race conditions по умолчанию)
  • Выше throughput (миллионы корутин vs сотни потоков)
  • Явная точка переключения контекста (await)

5. Задачи реального времени и низкой latency

Многопоточность неопредсказуема из-за context switching.

# ❌ ПЛОХО: непредсказуемая latency
import threading
import time

def critical_task():
    start = time.time()
    # Критически важный код
    result = calculate()
    latency = time.time() - start
    print(f"Latency: {latency}s")  # Может быть 0.001s или 0.1s (случайно!)

# ✅ ХОРОШО: async с предсказуемыми точками переключения
async def critical_task():
    start = time.monotonic()
    # Критический код выполняется целиком
    result = calculate()
    # Явная точка переключения
    await asyncio.sleep(0)
    latency = time.monotonic() - start
    print(f"Latency: {latency}s")  # Предсказуемо

6. Задачи с большим количеством потоков

ОС неэффективна с тысячами потоков. Каждый поток требует ~1-2 MB памяти.

# ❌ ПЛОХО: 10 000 потоков
threads = []
for i in range(10_000):
    t = threading.Thread(target=do_something)
    threads.append(t)
    t.start()  # ~10-20 GB памяти, очень медленно

# ✅ ХОРОШО: asyncio с 10 000 корутин
async def main():
    tasks = [do_something() for _ in range(10_000)]
    await asyncio.gather(*tasks)  # ~50 MB памяти, быстро

7. GUI приложения

Большинство GUI фреймворков (Tkinter, PyQt) НЕ thread-safe.

# ❌ ПЛОХО: изменение GUI из другого потока
import threading
from tkinter import Tk, Label

root = Tk()
label = Label(root, text="Count: 0")
label.pack()

def update_gui():
    for i in range(10):
        label.config(text=f"Count: {i}")  # SegFault или undefined behavior
        time.sleep(1)

thread = threading.Thread(target=update_gui)
thread.start()
root.mainloop()

# ✅ ХОРОШО: использовать queue
import queue
import threading

q = queue.Queue()

def update_gui():
    try:
        message = q.get_nowait()
        label.config(text=message)
    except queue.Empty:
        pass
    root.after(100, update_gui)  # Проверяй очередь регулярно

root.mainloop()

Итоговая таблица

ЗадачаПочему не потокиАльтернатива
CPU вычисленияGIL блокируетmultiprocessing
Shared stateRace conditionsasync, queues, immutable data
Много параллизмаMemory overheadasyncio
Complex syncDeadlock riskasync/await
GUINot thread-safeQueues, callbacks
Real-time low latencyНепредсказуемоasync
Миллионы операцийOOMasyncio, event loop

Вывод

Используй потоки для: I/O операций (если нет возможности async), background работы, когда код already thread-safe.

НЕ используй потоки для: CPU-bound работы, сложного состояния, высоконагруженных приложений, GUI, real-time систем.

В 2024+ году async/await — это первый выбор для большинства задач в Python. Потоки — это legacy подход, который всё ещё полезен, но требует глубокого понимания о том, что ты делаешь.