← Назад к вопросам

Является ли список потокобезопасным?

2.0 Middle🔥 161 комментариев
#DevOps и инфраструктура#Django

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Потокобезопасность списка в Python

Коротко: Список в Python частично потокобезопасен благодаря GIL (Global Interpreter Lock), но это не гарантирует безопасность в многопоточной среде. Нужно применять дополнительные механизмы синхронизации для надёжной работы.

Что означает потокобезопасность

Потокобезопасная (thread-safe) структура данных — это структура, которая может быть безопасно использована несколькими потоками одновременно без повреждения данных.

from threading import Thread
import time

shared_list = []

def append_numbers():
    for i in range(100):
        shared_list.append(i)

# Запустим 5 потоков одновременно
threads = [Thread(target=append_numbers) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(len(shared_list))  # Должно быть 500, но может быть меньше!

GIL и Python

В CPython (стандартная реализация) есть GIL (Global Interpreter Lock) — глобальная блокировка, которая позволяет только одному потоку выполнять Python код в момент времени.

Внутри GIL некоторые операции атомарны (неделимы):

# Эти операции обычно безопасны:
my_list.append(item)      # Одна операция
my_list.pop()             # Одна операция
my_list[0] = value        # Одна операция
len(my_list)              # Одна операция

Опасные ситуации

1. Операции, состоящие из нескольких шагов:

from threading import Thread
import time

my_list = [1, 2, 3]

def risky_operation():
    # Это НЕ атомарно!
    if len(my_list) > 0:  # Шаг 1: проверка длины
        time.sleep(0.0001)  # Контекст переключился
        value = my_list[0]  # Шаг 2: чтение элемента (может быть ошибка!)
        print(value)

threads = [Thread(target=risky_operation) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

2. Итерация во время изменения:

from threading import Thread

my_list = list(range(1000))

def iterate():
    for item in my_list:  # Может вызвать RuntimeError
        print(item)

def modify():
    for i in range(500):
        my_list.append(i)

t1 = Thread(target=iterate)
t2 = Thread(target=modify)

t1.start()
t2.start()
t1.join()
t2.join()

Ошибка: RuntimeError: list changed size during iteration

3. Комплексные операции:

from threading import Thread

my_list = [1, 2, 3]
results = []

def complex_operation():
    # Это не атомарно!
    max_val = max(my_list)  # Может быть исключение если список пуст
    my_list.append(max_val + 1)  # А список изменился
    results.append(max_val)

threads = [Thread(target=complex_operation) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Решения: потокобезопасность

1. Lock (Mutex) — базовая блокировка:

from threading import Thread, Lock

my_list = []
lock = Lock()

def append_safely():
    for i in range(100):
        with lock:  # Захватываем блокировку
            my_list.append(i)

threads = [Thread(target=append_safely) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(len(my_list))  # Гарантировано 500

2. RLock (Recursive Lock) — для вложенных блокировок:

from threading import Thread, RLock

my_list = []
rlock = RLock()

def recursive_operation():
    with rlock:
        my_list.append(1)
        with rlock:  # Один и тот же поток может захватить снова
            my_list.append(2)

t = Thread(target=recursive_operation)
t.start()
t.join()

3. Semaphore — ограничение числа одновременных доступов:

from threading import Thread, Semaphore

my_list = []
sem = Semaphore(3)  # Максимум 3 потока одновременно

def limited_access():
    with sem:
        my_list.append(1)

threads = [Thread(target=limited_access) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

4. queue.Queue — специализирована для потокобезопасной очереди:

from threading import Thread
from queue import Queue

q = Queue()

def producer():
    for i in range(100):
        q.put(i)  # Потокобезопасно

def consumer():
    while True:
        item = q.get()  # Потокобезопасно
        if item is None:
            break
        print(item)

p = Thread(target=producer)
c = Thread(target=consumer)

p.start()
c.start()

p.join()
q.put(None)  # Сигнал завершения
c.join()

5. threading.local() — локальные данные потока:

from threading import Thread, local

thread_data = local()

def thread_safe_operation():
    thread_data.my_list = []  # Каждый поток имеет свой список
    thread_data.my_list.append(1)
    print(thread_data.my_list)

threads = [Thread(target=thread_safe_operation) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

6. Использование Condition для синхронизации:

from threading import Thread, Condition

cv = Condition()
data = []

def producer():
    for i in range(10):
        with cv:
            data.append(i)
            cv.notify_all()  # Уведомить потребителей

def consumer():
    with cv:
        cv.wait()  # Ждать уведомления
        print(data)

p = Thread(target=producer)
c = Thread(target=consumer)

p.start()
c.start()
p.join()
c.join()

Таблица безопасности операций

ОперацияБезопаснаПримечание
list.append()ДаАтомарная операция
list.pop()ДаАтомарная операция
list[i] = xДаАтомарная операция
list[i]ДаЧтение безопасно
for item in list:НетИзменение во время итерации
max(list)НетСписок может измениться
len(list)ДаАтомарная операция

Правило большого пальца

Если список изменяется во время использования в многопоточной среде — ВСЕГДА используйте Lock.

# Плохо
for item in my_list:
    process(item)  # Если my_list меняется в другом потоке — ошибка

# Хорошо
with lock:
    for item in my_list:
        process(item)

Заключение

  • GIL обеспечивает базовую безопасность для простых операций
  • Комплексные операции ТРЕБУЮТ явной синхронизации
  • queue.Queue — предпочтительный выбор для обмена данными между потоками
  • При сомнениях — используйте Lock
  • Рассмотрите многопроцессность (multiprocessing) для CPU-bound операций