← Назад к вопросам

Что предотвращает использование примитивов синхронизации?

2.0 Middle🔥 121 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что предотвращает использование примитивов синхронизации в Python

Этот вопрос о Global Interpreter Lock (GIL) и других ограничениях Python при многопоточности. Расскажу подробно.

1. Global Interpreter Lock (GIL)

GIL — это мьютекс, который позволяет только одному потоку одновременно выполнять Python байт-код. Это главное препятствие для использования примитивов синхронизации.

import threading
import time

def cpu_bound_task(n):
    """Вычислительно интенсивная задача"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# ❌ Многопоточность НЕ помогает (GIL)
start = time.time()
threads = []
for _ in range(4):
    t = threading.Thread(target=cpu_bound_task, args=(10_000_000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"С потоками: {time.time() - start:.2f}s")  # ~4 сек (медленнее, чем без!)

# ✅ Последовательно без потоков
start = time.time()
for _ in range(4):
    cpu_bound_task(10_000_000)
print(f"Без потоков: {time.time() - start:.2f}s")  # ~2.5 сек

Почему GIL существует:

  • CPython реализован на C
  • Управление памятью через reference counting
  • Reference counting не потокобезопасен без lock'а
  • Вместо lock'а на каждый объект, один глобальный lock проще

2. Как GIL влияет на примитивы синхронизации

Lock всегда нужен (даже простые операции не atomic):

import threading

counter = 0
lock = threading.Lock()

def increment_without_lock():
    global counter
    for _ in range(1_000_000):
        counter += 1  # На самом деле это 3 операции:
                     # 1. LOAD counter
                     # 2. ADD 1
                     # 3. STORE counter

def increment_with_lock():
    global counter
    for _ in range(1_000_000):
        with lock:
            counter += 1

# БЕЗ lock'а результат неправильный
counter = 0
threads = [threading.Thread(target=increment_without_lock) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Без lock: {counter}")  # ~2M вместо 4M (race condition!)

# С lock'ом правильно, но ОЧЕНЬ медленно
counter = 0
threads = [threading.Thread(target=increment_with_lock) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"С lock: {counter}")  # 4M (правильно!)

3. Примитивы синхронизации в Python

import threading
from threading import Lock, RLock, Semaphore, Event, Condition
import queue

# 1. Lock (простой мьютекс)
lock = threading.Lock()

with lock:
    # Критическая секция
    shared_resource.update()

# 2. RLock (переиспользуемый lock для одного потока)
rlock = threading.RLock()

with rlock:
    with rlock:  # ✅ Один и тот же поток может захватить дважды
        pass

# 3. Semaphore (счётчик для ограничения доступа)
semaphore = threading.Semaphore(3)  # Максимум 3 потока

with semaphore:
    # Не более 3 потоков одновременно
    expensive_operation()

# 4. Event (сигнал между потоками)
event = threading.Event()

def waiter():
    event.wait()  # Ждать сигнал
    print("Получил сигнал!")

def signaler():
    time.sleep(1)
    event.set()  # Отправить сигнал

# 5. Condition (для complex synchronization)
data = []
condition = threading.Condition()

def producer():
    global data
    with condition:
        data.append(random.randint(1, 10))
        condition.notify()  # Уведомить consumers

def consumer():
    global data
    with condition:
        condition.wait()  # Ждать уведомления
        if data:
            print(f"Потреблю: {data.pop()}")

# 6. Queue (thread-safe очередь)
q = queue.Queue(maxsize=10)

def producer_queue():
    for i in range(5):
        q.put(i)  # Thread-safe

def consumer_queue():
    while True:
        item = q.get()  # Блокирует, если пусто
        print(f"Потреблю: {item}")
        q.task_done()  # Сигнал о завершении

4. Почему примитивы синхронизации не помогают для CPU-bound задач

# Проблема: GIL выпускается только при:
# 1. Блокирующих операциях (I/O, sleep)
# 2. Каждые N инструкций (check_interval)
# 3. Явном yield'е

import threading
import time

def task_with_io():
    """I/O задача — здесь GIL отпускается"""
    for _ in range(5):
        time.sleep(1)  # ← GIL отпускается
        print(f"Поток {threading.current_thread().name}")

def task_with_cpu():
    """CPU задача — GIL не отпускается"""
    result = 0
    for i in range(100_000_000):
        result += i ** 2
    return result

# Для I/O задач многопоточность работает отлично
start = time.time()
threads = []
for i in range(4):
    t = threading.Thread(target=task_with_io)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"I/O с потоками: {time.time() - start:.2f}s")  # ~5 сек (параллельно)

# Для CPU задач многопоточность медленнее
start = time.time()
for _ in range(4):
    task_with_cpu()
print(f"CPU без потоков: {time.time() - start:.2f}s")  # ~5 сек

start = time.time()
threads = []
for _ in range(4):
    t = threading.Thread(target=task_with_cpu)
    threads.append(t)
    t.start()

for t in threads:
    t.join()
print(f"CPU с потоками: {time.time() - start:.2f}s")  # ~8+ сек (медленнее!)

5. Альтернативы примитивам синхронизации

Вариант 1: Асинхронность вместо многопоточности

import asyncio

async def io_task():
    """Асинхронная задача — идеально для I/O"""
    await asyncio.sleep(1)
    return "результат"

async def main():
    # Выполнить 4 задачи параллельно
    results = await asyncio.gather(
        io_task(),
        io_task(),
        io_task(),
        io_task()
    )
    print(results)

# Выполнится в ~1 сек (не 4!), вместо потоков
asyncio.run(main())

# Для FastAPI
from fastapi import FastAPI
import aiohttp

app = FastAPI()

@app.get("/parallel")
async def parallel_requests():
    async with aiohttp.ClientSession() as session:
        tasks = [
            session.get(f"https://api.example.com/item/{i}")
            for i in range(10)
        ]
        results = await asyncio.gather(*tasks)
        return results

Вариант 2: Multiprocessing для CPU-bound

from multiprocessing import Pool, Process
import os

def cpu_bound_task(n):
    """Вычисления (каждый процесс имеет свой GIL)"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

# ✅ Multiprocessing обходит GIL (разные процессы)
if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(cpu_bound_task, [10_000_000] * 4)
        print(f"Multiprocessing результаты: {results}")
        # Выполнится примерно в 4 раза быстрее!

Вариант 3: Celery для распределённых задач

from celery import Celery

app = Celery("tasks", broker="redis://localhost")

@app.task
def expensive_task(data):
    """Долгая задача в отдельном процессе"""
    time.sleep(10)
    return f"Обработано: {data}"

# В главном приложении
result = expensive_task.delay("data")  # Не блокирует!
result.get()  # Получить результат

6. Реальный пример: когда примитивы помогают

import threading
import queue
import time

class WebCrawler:
    def __init__(self, num_workers=4):
        self.queue = queue.Queue()  # Thread-safe очередь
        self.lock = threading.Lock()  # Для общих ресурсов
        self.results = []  # Защищён lock'ом
        self.num_workers = num_workers
    
    def worker(self):
        while True:
            # Queue.get() блокирует, если пусто
            url = self.queue.get()
            
            if url is None:  # Сигнал выхода
                break
            
            try:
                # Здесь I/O — GIL отпускается
                data = self.fetch_url(url)
                
                # Защита при записи общего ресурса
                with self.lock:
                    self.results.append(data)
            finally:
                self.queue.task_done()
    
    def crawl(self, urls):
        # Запустить workers
        threads = []
        for _ in range(self.num_workers):
            t = threading.Thread(target=self.worker)
            threads.append(t)
            t.start()
        
        # Добавить URLs в очередь
        for url in urls:
            self.queue.put(url)
        
        # Дождаться завершения
        self.queue.join()
        
        # Остановить workers
        for _ in range(self.num_workers):
            self.queue.put(None)
        
        for t in threads:
            t.join()
        
        return self.results
    
    def fetch_url(self, url):
        # Имитация I/O (в реальности requests.get)
        time.sleep(1)
        return f"Data from {url}"

# Использование
crawler = WebCrawler(num_workers=4)
urls = [f"https://example.com/{i}" for i in range(10)]
results = crawler.crawl(urls)
print(f"Получено {len(results)} результатов за ~3 сек")

Сравнение подходов

┌──────────────────┬─────────────┬──────────────────┐
│   Тип задачи     │ Threading   │ Asyncio/Multiproc│
├──────────────────┼─────────────┼──────────────────┤
│ I/O bound        │ ✅ Хорошо   │ ✅ Отлично       │
│ (HTTP, DB)       │             │                  │
├──────────────────┼─────────────┼──────────────────┤
│ CPU bound        │ ❌ Плохо    │ ✅ Хорошо        │
│ (вычисления)     │ (GIL!)      │ (разные процессы)│
├──────────────────┼─────────────┼──────────────────┤
│ Mixed            │ ⚠️  Сложно  │ ✅ Хорошо        │
└──────────────────┴─────────────┴──────────────────┘

Вывод

GIL предотвращает использование примитивов синхронизации для:

  1. CPU-bound задач — многопоточность не помогает, а наоборот замедляет
  2. Оптимизации — даже с lock'ами код не будет параллельным

Примитивы синхронизации полезны для:

  1. I/O-bound задач — потокобезопасность и координация потоков
  2. Защиты общих ресурсов — race condition'ы
  3. Синхронизации потоков — координация между ними

Альтернативы:

  • asyncio для I/O без overhead потоков
  • multiprocessing для CPU без GIL
  • Celery для распределённых вычислений

Python 3.13+ работает над удалением GIL, что кардинально изменит ситуацию с многопоточностью и примитивами синхронизации.