Как решить проблему, когда все потоки имеют доступ к глобальной переменной?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как решить проблему, когда все потоки имеют доступ к глобальной переменной?
Это проблема race conditions (условий гонки). Когда несколько потоков одновременно изменяют глобальную переменную, результат становится непредсказуемым.
Проблема: Race Condition
Демонстрация проблемы
import threading
import time
# Глобальная переменная
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # ❌ Race condition!
# Создаём 5 потоков
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}")
# Ожидаем: 500000 (5 потоков × 100000)
# Получаем: ~470000 (непредсказуемо)
Почему это происходит?
Операция counter += 1 на самом деле:
# Это НЕ атомарная операция!
temp = counter # 1. Прочитали
temp = temp + 1 # 2. Увеличили
counter = temp # 3. Записали
# Если два потока одновременно:
Thread 1: counter = 5
Thread 2: counter = 5
Thread 1: temp1 = 5 → temp1 = 6 → counter = 6
Thread 2: temp2 = 5 → temp2 = 6 → counter = 6 (должна быть 7!)
Решение 1: Lock (Мьютекс)
Самый простой способ — использовать Lock для синхронизации доступа:
import threading
counter = 0
lock = threading.Lock() # Создаём lock
def increment():
global counter
for _ in range(100000):
with lock: # Критическая секция
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # 500000 ✅
Как работает Lock:
Текущий счёт: counter = 100
Thread 1: lock.acquire() ✓ Вошли
Thread 2: lock.acquire() ✗ Ждём (Thread 1 держит)
Thread 3: lock.acquire() ✗ Ждём (Thread 1 держит)
Thread 1: counter = 101 → lock.release() Выходим
Thread 2: lock.acquire() ✓ Вошли
Thread 3: lock.acquire() ✗ Ждём (Thread 2 держит)
Thread 2: counter = 102 → lock.release() Выходим
Решение 2: RLock (Reentrant Lock)
Если один поток может вызвать функцию рекурсивно:
import threading
counter = 0
lock = threading.RLock() # RLock для рекурсии
def update_counter():
with lock:
global counter
counter += 1
if counter < 10:
update_counter() # Рекурсивный вызов
thread = threading.Thread(target=update_counter)
thread.start()
thread.join()
print(f"Counter: {counter}") # 10
Решение 3: Semaphore
Для контроля количества потоков, имеющих доступ:
import threading
import time
# Максимум 3 потока одновременно
semaphore = threading.Semaphore(3)
def access_resource(thread_id):
with semaphore:
print(f"Thread {thread_id} accessing")
time.sleep(1) # Имитация работы
print(f"Thread {thread_id} done")
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
Решение 4: Condition Variable
Для синхронизации и сигналирования между потоками:
import threading
import time
condition = threading.Condition()
data = []
def producer():
for i in range(5):
with condition:
data.append(i)
print(f"Produced: {i}")
condition.notify() # Оповещаем потребителей
time.sleep(0.1)
def consumer():
while True:
with condition:
if not data:
condition.wait() # Ждём оповещения
if data:
item = data.pop(0)
print(f"Consumed: {item}")
if item == 4:
break
time.sleep(0.2)
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Решение 5: Thread-local Storage
Если каждому потоку нужна своя копия переменной:
import threading
# Каждый поток имеет свою копию переменной
thread_local = threading.local()
def set_and_print():
thread_local.value = threading.current_thread().name
print(f"Value: {thread_local.value}")
threads = [threading.Thread(target=set_and_print, name=f"Thread-{i}") for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
# Вывод:
# Value: Thread-0
# Value: Thread-1
# Value: Thread-2
# Каждый поток имеет свой value!
Решение 6: Queue (очередь)
Для безопасной передачи данных между потоками:
import threading
import queue
import time
# Thread-safe очередь
task_queue = queue.Queue()
def producer():
for i in range(5):
task_queue.put(f"Task {i}") # Потокобезопасно
time.sleep(0.1)
def consumer():
while True:
try:
task = task_queue.get(timeout=1) # Потокобезопасно
print(f"Processing: {task}")
task_queue.task_done()
if "Task 4" in task:
break
except queue.Empty:
break
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Решение 7: Избежать глобального состояния
Лучшее решение — исключить глобальную переменную:
# ❌ Плохо — глобальное состояние
shared_data = {"count": 0}
def increment():
global shared_data
shared_data["count"] += 1 # Race condition!
# ✅ Хорошо — передаём состояние параметром
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.value += 1
counter = Counter()
def worker():
for _ in range(1000):
counter.increment()
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter.value) # 5000 ✅
Решение 8: Атомарные операции
Для простых операций используйте готовые потокобезопасные классы:
import threading
from threading import Lock
class AtomicCounter:
def __init__(self):
self.value = 0
self._lock = Lock()
def increment(self, delta=1):
with self._lock:
self.value += delta
return self.value
def get(self):
with self._lock:
return self.value
counter = AtomicCounter()
def worker():
for _ in range(1000):
counter.increment()
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter.get()) # 5000 ✅
Таблица решений
| Проблема | Решение | Пример |
|---|---|---|
| Одновременный доступ к переменной | Lock / RLock | with lock: counter += 1 |
| Ограничить N потоков | Semaphore | semaphore = Semaphore(3) |
| Синхронизация потоков | Condition | condition.wait() / notify() |
| Каждому потоку свои данные | ThreadLocal | threading.local() |
| Передача данных между потоками | Queue | queue.Queue() |
| Полностью убрать глобальное состояние | Класс с методами | counter.increment() |
| Простой счётчик | AtomicCounter класс | Можно использовать из Numba |
Лучшая практика
Избегайте глобального состояния вообще:
# Вместо глобальных переменных используйте DI (Dependency Injection)
class Application:
def __init__(self):
self.counter = Counter()
self.lock = threading.Lock()
def process(self):
with self.lock:
self.counter.value += 1
app = Application()
def worker(app):
for _ in range(1000):
app.process()
threads = [threading.Thread(target=worker, args=(app,)) for _ in range(5)]
Вывод: Lock — самое универсальное решение для защиты доступа, но лучше всего архитектурно исключить глобальное состояние с самого начала.