← Назад к вопросам

Как защититься от состояния гонки?

2.3 Middle🔥 181 комментариев
#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Защита от состояния гонки (Race Condition)

Состояние гонки — это один из самых коварных багов в многопоточной разработке. Вот мой проверенный подход за 10+ лет.

Понимание проблемы

Состояние гонки возникает, когда два и более потока обращаются к общему ресурсу одновременно, и хотя бы один из них его изменяет:

# ❌ ПРОБЛЕМНЫЙ КОД
counter = 0

def increment():
    global counter
    temp = counter        # 1. Чтение
    temp += 1             # 2. Вычисление
    counter = temp        # 3. Запись

# Если два потока вызовут increment() одновременно:
# Поток A: читает counter (0), вычисляет (1)
# Поток B: читает counter (0), вычисляет (1)  ← Гонка!
# Поток A: записывает 1
# Поток B: записывает 1  ← Ожидали 2, получили 1

1. Базовое решение: Lock (Mutex)

Это самый простой способ — сделать блок кода атомарным:

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    with lock:  # ✅ ПРАВИЛЬНО
        temp = counter
        temp += 1
        counter = temp

# Или эквивалент:
def increment_alt():
    global counter
    lock.acquire()
    try:
        counter += 1
    finally:
        lock.release()

Плюсы: Простота, гарантированная безопасность Минусы: Performance penalty, может привести к deadlock

2. RLock (Reentrant Lock) для рекурсивного кода

Обычный Lock может привести к deadlock, если тот же поток попытается захватить его снова:

import threading

lock = threading.RLock()

def recursive_function(n):
    with lock:
        print(f"Level {n}")
        if n > 0:
            recursive_function(n - 1)  # ✅ Работает с RLock
            # С обычным Lock здесь был бы deadlock

3. Семафоры (Semaphore) для ограничения доступа

Если нужно ограничить количество потоков, обращающихся к ресурсу:

import threading

# Максимум 3 потока одновременно
sem = threading.Semaphore(3)

def access_limited_resource():
    with sem:
        print("Обращаюсь к ресурсу")
        # Одновременно только 3 потока

4. Condition Variable для координации потоков

Когда нужно синхронизировать потоки по условию:

import threading
import time

data = None
condition = threading.Condition()

def producer():
    global data
    with condition:
        data = "готовые данные"
        condition.notify_all()  # Пробуждаем ожидающие потоки

def consumer():
    global data
    with condition:
        condition.wait()  # Ждём, пока producer уведомит
        print(f"Получил: {data}")

5. Queue для безопасной передачи данных

Наилучший способ — избегать общего состояния, используя очереди:

import threading
from queue import Queue

q = Queue(maxsize=10)  # ✅ ЛУЧШИЙ ПОДХОД

def producer():
    for i in range(5):
        q.put(i)  # Потокобезопасно
        print(f"Добавил {i}")

def consumer():
    while True:
        item = q.get()  # Потокобезопасно
        print(f"Получил {item}")
        q.task_done()

Плюсы: Нет явных блокировок, более безопасно и масштабируемо

6. Атомарные операции (для простых типов)

Для простых случаев можно использовать атомарные операции:

import threading

class AtomicCounter:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self.value += 1
    
    def get(self):
        with self._lock:
            return self.value

counter = AtomicCounter()

7. asyncio для асинхронного кода (Python 3.7+)

Если вы используете асинхронный код, очень многие проблемы исчезают:

import asyncio

# ✅ БЕЗОПАСНО: asyncio работает в одном потоке
async def safe_increment():
    global counter
    counter += 1  # Никогда не будет гонки

async def main():
    tasks = [safe_increment() for _ in range(100)]
    await asyncio.gather(*tasks)

Важно: asyncio решает проблему гонки на уровне дизайна — переключение между coroutines происходит в известных точках (await).

8. Multiprocessing вместо многопоточности

Для некоторых задач безопаснее использовать отдельные процессы:

import multiprocessing

def worker(queue):
    for i in range(10):
        queue.put(i)

if __name__ == '__main__':
    q = multiprocessing.Queue()  # Безопасна между процессами
    p = multiprocessing.Process(target=worker, args=(q,))
    p.start()
    p.join()

9. Базы данных — встроенная защита

Для критичных операций используй БД с транзакциями:

# PostgreSQL пример
with db.transaction():
    # ✅ SELECT FOR UPDATE блокирует строку
    user = db.execute(
        "SELECT * FROM users WHERE id = %s FOR UPDATE",
        (user_id,)
    )
    balance = user['balance']
    
    if balance >= amount:
        db.execute(
            "UPDATE users SET balance = balance - %s WHERE id = %s",
            (amount, user_id)
        )

10. Практический пример: потокобезопасный счётчик

import threading
import time

class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self._value += 1
    
    def decrement(self):
        with self._lock:
            self._value -= 1
    
    def get(self):
        with self._lock:
            return self._value

# Тест
counter = ThreadSafeCounter()

def worker():
    for _ in range(1000):
        counter.increment()
        counter.decrement()
        counter.increment()

threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Финальное значение: {counter.get()}")  # 10000

Чеклист защиты от гонки

  1. Избегай общего состояния — лучший способ
  2. Используй Queue для передачи данных между потоками
  3. Используй asyncio вместо threads если возможно
  4. Используй Lock/RLock для критичных секций
  5. Используй Semaphore для ограничения доступа
  6. Используй БД транзакции для данных
  7. Тестируй под нагрузкой — гонки не всегда видны
  8. Документируй доступ к общему состоянию

Золотое правило

Лучший код без гонки — это код, который избегает совместного доступа к данным. Думай о потокобезопасности на уровне архитектуры, а не как об afterthought.