← Назад к вопросам

Какие особенности многопоточности в Python?

2.2 Middle🔥 251 комментариев
#Python Core#Архитектура и паттерны#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Особенности многопоточности в Python

Многопоточность в Python — это одна из самых мальпонимаемых тем. Каждый разработчик, работавший с Python, встречал неожиданные проблемы. Вот что я знаю из опыта проектирования высоконагруженных систем.

1. Global Interpreter Lock (GIL) — главная особенность

В CPython существует глобальная блокировка, которая предотвращает истинный параллелизм:

import threading
import time

def cpu_work():
    """CPU-bound операция"""
    count = 0
    for i in range(100_000_000):
        count += i
    return count

# Однопоточное выполнение
start = time.time()
result1 = cpu_work()
result2 = cpu_work()
print(f"Однопоточно: {time.time() - start:.2f}s")  # ~8s

# Попытка многопоточности
start = time.time()
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Многопоточно: {time.time() - start:.2f}s")  # ~16s (медленнее!)

# GIL не позволяет обеим потокам работать одновременно
# Каждый поток должен дождаться, пока другой отпустит GIL

Почему GIL существует? Потому что управление памятью в CPython использует подсчёт ссылок (reference counting), а не сборку мусора. Без GIL каждое изменение требовало бы глобальную синхронизацию.

2. I/O-bound операции НЕ ограничены GIL

Для операций ввода-вывода многопоточность работает отлично:

import threading
import time
import requests

def fetch_url(url):
    """I/O-bound операция"""
    response = requests.get(url, timeout=5)
    return len(response.content)

urls = ['https://httpbin.org/delay/2'] * 10

# Однопоточно
start = time.time()
for url in urls:
    fetch_url(url)
print(f"Однопоточно: {time.time() - start:.2f}s")  # ~20s

# Многопоточно
start = time.time()
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Многопоточно: {time.time() - start:.2f}s")  # ~2s (в 10 раз быстрее!)

# Пока один поток ждёт ответа, другой может работать
# GIL отпускается при I/O операциях

3. Threading vs Multiprocessing

При CPU-bound нужен multiprocessing:

from multiprocessing import Pool
import threading
import time

def cpu_task(n):
    """Heavy CPU calculation"""
    return sum(i*i for i in range(n))

task_data = [100_000_000] * 4

# Threading — медленно
start = time.time()
threads = [threading.Thread(target=cpu_task, args=(n,)) for n in task_data]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Threading: {time.time() - start:.2f}s")  # ~25s

# Multiprocessing — быстро
start = time.time()
with Pool(4) as p:
    results = p.map(cpu_task, task_data)
print(f"Multiprocessing: {time.time() - start:.2f}s")  # ~7s (в 3.5 раза быстрее)

# Каждый процесс имеет свой GIL

4. GIL отпускается при встроенных операциях

При вызове C-кода GIL может быть отпущен:

import threading
import time
import math

def compute_math():
    """Встроенные математические функции не требуют GIL"""
    for _ in range(100_000_000):
        math.sqrt(2)

# Однопоточно
start = time.time()
compute_math()
compute_math()
print(f"Однопоточно: {time.time() - start:.2f}s")  # ~5s

# Многопоточно
start = time.time()
t1 = threading.Thread(target=compute_math)
t2 = threading.Thread(target=compute_math)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Многопоточно: {time.time() - start:.2f}s")  # ~3s (быстрее!)

# math.sqrt() вызывает C-код, при котором GIL отпускается

5. Race Conditions — основная опасность

Многопоточность создаёт проблемы с конкурентным доступом:

import threading

class Counter:
    def __init__(self):
        self.value = 0

counter = Counter()

def increment():
    """Выглядит просто, но это 3 операции:
    1. Прочитать значение
    2. Добавить 1
    3. Записать значение
    """
    for _ in range(1_000_000):
        # ❌ Race condition
        counter.value += 1

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Expected: 4_000_000, Got: {counter.value}")
# Результат: 1_234_567 (случайное число, не 4 млн!)

# ✅ Решение — использовать Lock
from threading import Lock

class SafeCounter:
    def __init__(self):
        self.value = 0
        self._lock = Lock()
    
    def increment(self):
        with self._lock:
            self.value += 1

counter = SafeCounter()
threads = [threading.Thread(target=lambda: [counter.increment() for _ in range(1_000_000)]) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Safe: {counter.value}")  # 4_000_000

6. Daemon потоки — фоновые потоки

import threading
import time

def daemon_worker():
    """Фоновый рабочий процесс"""
    for i in range(100):
        time.sleep(0.1)
        print(f"Daemon: {i}")

def main_worker():
    """Основной процесс"""
    time.sleep(1)
    print("Main finished")

# Daemon поток
t_daemon = threading.Thread(target=daemon_worker, daemon=True)
t_daemon.start()

# Основной поток
t_main = threading.Thread(target=main_worker)
t_main.start()
t_main.join()

# Программа завершится, когда закончатся основные потоки
# Daemon потоки будут прерваны
print("Program exit")

7. Thread Local Storage — данные для каждого потока

import threading

thread_local = threading.local()

def worker(user_id):
    # Каждый поток имеет собственный thread_local
    thread_local.user_id = user_id
    thread_local.db_connection = f"conn_{user_id}"
    
    # Другие потоки не видят эти значения
    print(f"Thread {threading.current_thread().name}: user={thread_local.user_id}")
    
    # Используем соединение
    query_result = f"SELECT FROM {thread_local.db_connection}"
    print(query_result)

threads = [threading.Thread(target=worker, args=(i,), name=f"Worker-{i}") for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

8. Queue — потокобезопасная передача данных

from queue import Queue
import threading
import time

work_queue = Queue(maxsize=100)
results = Queue()
errors = 0

def producer():
    """Производитель — добавляет задачи"""
    for i in range(100):
        work_queue.put(f"task_{i}")
        time.sleep(0.01)

def consumer(worker_id):
    """Потребитель — обрабатывает задачи"""
    while True:
        try:
            task = work_queue.get(timeout=1)
            if task is None:
                break
            # Обработка
            result = f"result_{task}"
            results.put((worker_id, result))
            work_queue.task_done()
        except:
            break

# Создаём потоки
p_thread = threading.Thread(target=producer)
c_threads = [threading.Thread(target=consumer, args=(i,)) for i in range(4)]

p_thread.start()
for t in c_threads:
    t.start()

p_thread.join()
work_queue.join()  # Ждём, пока все задачи обработаны

for _ in range(4):
    work_queue.put(None)  # Stop signal

for t in c_threads:
    t.join()

print(f"Results: {results.qsize()}")

9. Asyncio вместо Threading

Для I/O-bound операций asyncio часто лучше:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['https://httpbin.org/delay/2'] * 10
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

# Все 10 запросов параллельны в одном потоке
results = asyncio.run(main())
print(f"Fetched {len(results)} URLs")

# Asyncio:
# - Один поток (нет context switching)
# - Выше throughput
# - Проще отладить
# - Менее голодный по памяти

10. Python 3.13+ — удаление GIL в экспериментальном режиме

# Python 3.13 может запуститься БЕЗ GIL:
# python -X gil=0 your_script.py

# Но это НЕ решает все проблемы потокобезопасности
# Race conditions всё равно существуют!

import threading

counter = 0

def increment():
    global counter
    for _ in range(1_000_000):
        counter += 1  # Всё ещё race condition!

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # Может быть не 4_000_000 даже без GIL!

Рекомендации для многопоточности в Python

  1. CPU-bound → multiprocessing — обходим GIL
  2. I/O-bound → asyncio — эффективнее threading
  3. Threading только для блокирующего I/O — файлы, сеть, БД
  4. Всегда используйте Lock — для доступа к общему состоянию
  5. Queue вместо прямого доступа — безопаснее
  6. Thread-local для контекстных данных — пользователь, соединение
  7. Профилируйте перед оптимизацией — где реальное узкое место
  8. Избегайте вложенных Lock'ов — риск deadlock
  9. Тестируйте многопоточный код — проблемы проявляются редко
  10. Документируйте потокобезопасность — явно указывайте

Многопоточность в Python — это мощный инструмент, но требует уважения. Используйте asyncio по умолчанию для I/O, multiprocessing для CPU, и threading только в крайнем случае для блокирующего I/O, который не может быть переведён на async.

Какие особенности многопоточности в Python? | PrepBro