← Назад к вопросам
Являются ли потоки потокобезопасными в Python
2.3 Middle🔥 271 комментариев
#Тестирование
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Потокобезопасность потоков в Python
Потоки в Python НЕ являются полностью потокобезопасными по умолчанию. Хотя GIL (Global Interpreter Lock) обеспечивает определённый уровень защиты для встроенных типов данных, это не гарантирует полную потокобезопасность при параллельном доступе к данным из нескольких потоков. Это требует явной синхронизации с помощью примитивов синхронизации.
GIL (Global Interpreter Lock) и его роль
GIL — это мьютекс, который защищает доступ к объектам Python во время выполнения байт-кода. Он существует только в CPython.
import threading
# GIL гарантирует, что в один момент времени только один поток
# выполняет код Python (байт-код)
def increment_counter(counter, iterations):
for _ in range(iterations):
counter['value'] += 1
counter = {'value': 0}
threads = []
# Создаём 10 потоков, каждый инкрементирует счётчик 100000 раз
for _ in range(10):
t = threading.Thread(target=increment_counter, args=(counter, 100000))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Ожидаемый результат: {10 * 100000}")
print(f"Фактический результат: {counter['value']}")
# Результат обычно меньше, чем ожидается! Race condition!
Проблема: Race Condition
Операция counter += 1 НЕ является атомарной. Она состоит из трёх шагов:
# Что происходит когда мы пишем: counter['value'] += 1
# 1. Прочитать текущее значение
temp = counter['value'] # Прочитать
# 2. Увеличить значение
temp = temp + 1 # Модифицировать
# 3. Записать новое значение
counter['value'] = temp # Записать
# Между этими шагами другой поток может вмешаться!
# Поток 1: читает 5
# Поток 2: читает 5 (не 6!)
# Поток 1: пишет 6
# Поток 2: пишет 6 (не 7!)
Явная синхронизация с Lock
Для потокобезопасности используй явные примитивы синхронизации:
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock() # Создаём мьютекс
def increment(self):
with self.lock: # Критическая секция
self.value += 1
def increment_counter(counter, iterations):
for _ in range(iterations):
counter.increment()
counter = Counter()
threads = []
for _ in range(10):
t = threading.Thread(target=increment_counter, args=(counter, 100000))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Результат с Lock: {counter.value}") # 1000000 — Верно!
Другие примитивы синхронизации
RLock (Reentrant Lock)
import threading
lock = threading.RLock() # Один и тот же поток может захватить несколько раз
def function1():
with lock:
print("Function 1")
function2() # Рекурсивный захват — OK с RLock
def function2():
with lock:
print("Function 2")
thread = threading.Thread(target=function1)
thread.start()
thread.join()
Semaphore
import threading
semaphore = threading.Semaphore(3) # Максимум 3 потока одновременно
def worker(worker_id):
with semaphore:
print(f"Worker {worker_id} работает")
# Только 3 работника одновременно
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Event
import threading
import time
event = threading.Event() # Сигнальный механизм
def waiter():
print("Waiter: ожидаю события")
event.wait() # Блокируется до .set()
print("Waiter: событие произошло!")
def setter():
time.sleep(2)
print("Setter: устанавливаю событие")
event.set()
waiter_thread = threading.Thread(target=waiter)
setter_thread = threading.Thread(target=setter)
waiter_thread.start()
setter_thread.start()
waiter_thread.join()
setter_thread.join()
Condition
import threading
condition = threading.Condition()
data = []
def producer():
global data
for i in range(5):
with condition:
data.append(i)
print(f"Produced: {i}")
condition.notify_all() # Уведомляем ожидающие потоки
def consumer():
global data
for _ in range(5):
with condition:
while not data: # Ждём данные
condition.wait()
item = data.pop(0)
print(f"Consumed: {item}")
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
ThreadLocal данные
Для хранения данных, уникальных для каждого потока:
import threading
thread_local = threading.local()
def worker(worker_id):
# Каждый поток имеет собственное значение
thread_local.data = f"Worker {worker_id}"
print(f"Поток {worker_id}: {thread_local.data}")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Queue — потокобезопасная очередь
import threading
from queue import Queue
# Queue автоматически потокобезопасна
queue = Queue(maxsize=10)
def producer():
for i in range(5):
queue.put(i) # Потокобезопасно
print(f"Produced: {i}")
def consumer():
for _ in range(5):
item = queue.get() # Потокобезопасно
print(f"Consumed: {item}")
queue.task_done() # Отмечаем, что задача выполнена
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Проблема с копией данных
import threading
data = [1, 2, 3, 4, 5]
result = []
# Копирование списка НЕ атомарно!
def worker():
with threading.Lock(): # Нужна синхронизация
result.extend(data.copy())
thread = threading.Thread(target=worker)
thread.start()
thread.join()
Встроенные типы данных и потокобезопасность
# Некоторые встроенные операции относительно безопасны благодаря GIL:
my_list = []
my_dict = {}
my_set = set()
# Атомарные операции:
my_list.append(1) # Безопасно
my_dict['key'] = 'value' # Безопасно
my_set.add(1) # Безопасно
x = my_list[0] # Безопасно
# Но последовательности операций не безопасны:
if len(my_list) > 0:
item = my_list[0] # Между проверкой и доступом могут быть проблемы
Лучшие практики
- Используй Queue для передачи данных между потоками — это потокобезопасно
- Используй Lock для критических секций — для защиты общих данных
- Используй asyncio вместо threading — для I/O операций
- Используй multiprocessing для CPU-bound задач — чтобы обойти GIL
- Избегай общих данных — проще всего не делиться состоянием
- Документируй потокобезопасность — явно указывай, какие методы потокобезопасны
asyncio как альтернатива
import asyncio
async def task(task_id):
print(f"Task {task_id} started")
await asyncio.sleep(1)
print(f"Task {task_id} done")
async def main():
# Асинхронность без потоков — нет race conditions!
await asyncio.gather(
task(1),
task(2),
task(3),
)
asyncio.run(main())
Выводы
- Потоки НЕ потокобезопасны по умолчанию — нужна явная синхронизация
- GIL обеспечивает защиту для отдельных байт-кодов, но не для последовательностей операций
- Используй Lock, RLock, Semaphore, Event, Condition для синхронизации
- Queue — потокобезопасная структура данных
- asyncio лучше для I/O операций — не требует явной синхронизации
- multiprocessing лучше для CPU-bound операций — обходит GIL