← Назад к вопросам

Как бороться с состоянием гонки (Race condition) в многопоточности?

2.7 Senior🔥 201 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Состояния гонки (Race Conditions) в многопоточности: стратегии борьбы

Состояние гонки возникает, когда два или более потока получают доступ к общему ресурсу одновременно, и результат зависит от порядка выполнения. Это одна из самых коварных ошибок. Расскажу о проверенных подходах.

1. Mutexes (Мьютексы) и Locks

Самый базовый инструмент — синхронизация с помощью блокировок. В Python это threading.Lock:

import threading

class BankAccount:
    def __init__(self, balance: int):
        self.balance = balance
        self.lock = threading.Lock()
    
    def withdraw(self, amount: int) -> bool:
        with self.lock:  # Критическая секция защищена
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False

Плюсы: простая и универсальная Минусы: может привести к deadlock'ам, медленнее, может быть bottleneck

2. RLock (Рекурсивная блокировка)

Если один поток должен несколько раз захватить одну блокировку:

lock = threading.RLock()

def outer():
    with lock:
        inner()  # Тот же поток может захватить ещё раз

def inner():
    with lock:
        pass

3. Семафоры (Semaphores)

Для контроля доступа нескольких потоков к ресурсу:

semaphore = threading.Semaphore(3)  # Максимум 3 потока

def access_resource():
    with semaphore:
        print("Поток получил доступ")
        # использование ресурса

4. Queue — самый Pythonic способ

Для передачи данных между потоками используй queue.Queue. Она потокобезопасна по умолчанию:

import queue
import threading

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)  # Безопасно добавляет данные

def consumer():
    while True:
        item = q.get()  # Блокирующая операция
        if item is None:
            break
        print(f"Обработал {item}")
        q.task_done()

prod = threading.Thread(target=producer)
cons = threading.Thread(target=consumer)
prod.start()
cons.start()
q.put(None)  # Сигнал завершения
cons.join()

5. asyncio вместо многопоточности

В Python для большинства задач asyncio лучше, чем многопоточность. Нет race condition'ов по определению:

import asyncio

class AsyncCounter:
    def __init__(self):
        self.count = 0
    
    async def increment(self):
        # Нет race condition'ов! asyncio single-threaded
        self.count += 1
        await asyncio.sleep(0.1)
        self.count += 1

async def main():
    counter = AsyncCounter()
    await asyncio.gather(
        counter.increment(),
        counter.increment(),
        counter.increment()
    )
    print(counter.count)  # Всегда 6 (гарантировано)

asyncio.run(main())

6. Atom операции (Atomic Operations)

Некоторые операции в Python уже атомарны благодаря GIL (Global Interpreter Lock). Например, присваивание простой переменной:

# Это безопасно благодаря GIL
self.value = 42  # Атомарно

# Это НЕ безопасно!
self.value += 1  # LOAD + ADD + STORE — три операции!

7. threading.Condition для сигнализации

Для случаев, когда потокам нужно ждать события:

condition = threading.Condition()
data_ready = False
data = None

def producer():
    global data_ready, data
    with condition:
        data = calculate_data()
        data_ready = True
        condition.notify_all()  # Разбудить все потоки

def consumer():
    global data_ready, data
    with condition:
        condition.wait_for(lambda: data_ready)  # Ждать пока флаг не true
        process(data)

8. Избегай race condition'ов архитектурно

Так как race condition'ы сложно отлаживать, лучше избежать многопоточности вообще:

  • Используй asyncio для I/O операций
  • Используй multiprocessing для CPU-bound задач (каждый процесс имеет свой GIL)
  • Сводить общее состояние к минимуму
  • Используй immutable структуры данных

9. Примеры на практике

Плохо — классический race condition:

balance = 100

# Поток 1
balance = balance - 50  # LOAD (100) - 50 = 50, STORE

# Поток 2 (одновременно)
balance = balance - 30  # LOAD (100) - 30 = 70, STORE

# Результат: 70 вместо 20!

Хорошо — с синхронизацией:

from threading import Lock

balance = 100
lock = Lock()

with lock:
    balance -= 50  # Гарантировано атомарно

Итого: мой выбор

  1. Первый выбор: Переписать код на asyncio — нет многопоточности, нет проблем
  2. Если нужна многопоточность: Queue для обмена данными
  3. Если общее состояние неизбежно: Lock + тщательное тестирование
  4. Для CPU-bound: multiprocessing с отдельными процессами
Как бороться с состоянием гонки (Race condition) в многопоточности? | PrepBro