← Назад к вопросам

Являются ли потоки потокобезопасными в Python

2.3 Middle🔥 271 комментариев
#Тестирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Потокобезопасность потоков в Python

Потоки в Python НЕ являются полностью потокобезопасными по умолчанию. Хотя GIL (Global Interpreter Lock) обеспечивает определённый уровень защиты для встроенных типов данных, это не гарантирует полную потокобезопасность при параллельном доступе к данным из нескольких потоков. Это требует явной синхронизации с помощью примитивов синхронизации.

GIL (Global Interpreter Lock) и его роль

GIL — это мьютекс, который защищает доступ к объектам Python во время выполнения байт-кода. Он существует только в CPython.

import threading

# GIL гарантирует, что в один момент времени только один поток
# выполняет код Python (байт-код)

def increment_counter(counter, iterations):
    for _ in range(iterations):
        counter['value'] += 1

counter = {'value': 0}
threads = []

# Создаём 10 потоков, каждый инкрементирует счётчик 100000 раз
for _ in range(10):
    t = threading.Thread(target=increment_counter, args=(counter, 100000))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Ожидаемый результат: {10 * 100000}")
print(f"Фактический результат: {counter['value']}")
# Результат обычно меньше, чем ожидается! Race condition!

Проблема: Race Condition

Операция counter += 1 НЕ является атомарной. Она состоит из трёх шагов:

# Что происходит когда мы пишем: counter['value'] += 1

# 1. Прочитать текущее значение
temp = counter['value']  # Прочитать

# 2. Увеличить значение
temp = temp + 1         # Модифицировать

# 3. Записать новое значение
counter['value'] = temp # Записать

# Между этими шагами другой поток может вмешаться!
# Поток 1: читает 5
# Поток 2: читает 5 (не 6!)
# Поток 1: пишет 6
# Поток 2: пишет 6 (не 7!)

Явная синхронизация с Lock

Для потокобезопасности используй явные примитивы синхронизации:

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()  # Создаём мьютекс
    
    def increment(self):
        with self.lock:  # Критическая секция
            self.value += 1

def increment_counter(counter, iterations):
    for _ in range(iterations):
        counter.increment()

counter = Counter()
threads = []

for _ in range(10):
    t = threading.Thread(target=increment_counter, args=(counter, 100000))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Результат с Lock: {counter.value}")  # 1000000 — Верно!

Другие примитивы синхронизации

RLock (Reentrant Lock)

import threading

lock = threading.RLock()  # Один и тот же поток может захватить несколько раз

def function1():
    with lock:
        print("Function 1")
        function2()  # Рекурсивный захват — OK с RLock

def function2():
    with lock:
        print("Function 2")

thread = threading.Thread(target=function1)
thread.start()
thread.join()

Semaphore

import threading

semaphore = threading.Semaphore(3)  # Максимум 3 потока одновременно

def worker(worker_id):
    with semaphore:
        print(f"Worker {worker_id} работает")
        # Только 3 работника одновременно

threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Event

import threading
import time

event = threading.Event()  # Сигнальный механизм

def waiter():
    print("Waiter: ожидаю события")
    event.wait()  # Блокируется до .set()
    print("Waiter: событие произошло!")

def setter():
    time.sleep(2)
    print("Setter: устанавливаю событие")
    event.set()

waiter_thread = threading.Thread(target=waiter)
setter_thread = threading.Thread(target=setter)

waiter_thread.start()
setter_thread.start()

waiter_thread.join()
setter_thread.join()

Condition

import threading

condition = threading.Condition()
data = []

def producer():
    global data
    for i in range(5):
        with condition:
            data.append(i)
            print(f"Produced: {i}")
            condition.notify_all()  # Уведомляем ожидающие потоки

def consumer():
    global data
    for _ in range(5):
        with condition:
            while not data:  # Ждём данные
                condition.wait()
            item = data.pop(0)
            print(f"Consumed: {item}")

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

ThreadLocal данные

Для хранения данных, уникальных для каждого потока:

import threading

thread_local = threading.local()

def worker(worker_id):
    # Каждый поток имеет собственное значение
    thread_local.data = f"Worker {worker_id}"
    print(f"Поток {worker_id}: {thread_local.data}")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Queue — потокобезопасная очередь

import threading
from queue import Queue

# Queue автоматически потокобезопасна
queue = Queue(maxsize=10)

def producer():
    for i in range(5):
        queue.put(i)  # Потокобезопасно
        print(f"Produced: {i}")

def consumer():
    for _ in range(5):
        item = queue.get()  # Потокобезопасно
        print(f"Consumed: {item}")
        queue.task_done()  # Отмечаем, что задача выполнена

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Проблема с копией данных

import threading

data = [1, 2, 3, 4, 5]
result = []

# Копирование списка НЕ атомарно!
def worker():
    with threading.Lock():  # Нужна синхронизация
        result.extend(data.copy())

thread = threading.Thread(target=worker)
thread.start()
thread.join()

Встроенные типы данных и потокобезопасность

# Некоторые встроенные операции относительно безопасны благодаря GIL:
my_list = []
my_dict = {}
my_set = set()

# Атомарные операции:
my_list.append(1)          # Безопасно
my_dict['key'] = 'value'   # Безопасно
my_set.add(1)              # Безопасно
x = my_list[0]             # Безопасно

# Но последовательности операций не безопасны:
if len(my_list) > 0:
    item = my_list[0]  # Между проверкой и доступом могут быть проблемы

Лучшие практики

  1. Используй Queue для передачи данных между потоками — это потокобезопасно
  2. Используй Lock для критических секций — для защиты общих данных
  3. Используй asyncio вместо threading — для I/O операций
  4. Используй multiprocessing для CPU-bound задач — чтобы обойти GIL
  5. Избегай общих данных — проще всего не делиться состоянием
  6. Документируй потокобезопасность — явно указывай, какие методы потокобезопасны

asyncio как альтернатива

import asyncio

async def task(task_id):
    print(f"Task {task_id} started")
    await asyncio.sleep(1)
    print(f"Task {task_id} done")

async def main():
    # Асинхронность без потоков — нет race conditions!
    await asyncio.gather(
        task(1),
        task(2),
        task(3),
    )

asyncio.run(main())

Выводы

  • Потоки НЕ потокобезопасны по умолчанию — нужна явная синхронизация
  • GIL обеспечивает защиту для отдельных байт-кодов, но не для последовательностей операций
  • Используй Lock, RLock, Semaphore, Event, Condition для синхронизации
  • Queue — потокобезопасная структура данных
  • asyncio лучше для I/O операций — не требует явной синхронизации
  • multiprocessing лучше для CPU-bound операций — обходит GIL
Являются ли потоки потокобезопасными в Python | PrepBro