← Назад к вопросам

Как сделать потокобезопасный словарь?

2.2 Middle🔥 71 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как сделать потокобезопасный словарь

Обычный Python словарь (dict) не является потокобезопасным. При одновременном доступе из нескольких потоков может произойти повреждение данных. Существует несколько подходов для создания потокобезопасного словаря.

1. threading.Lock — базовый подход

Оберните обычный dict в класс с использованием мьютекса:

import threading
from typing import Any, Optional

class ThreadSafeDict:
    def __init__(self):
        self._dict = {}
        self._lock = threading.Lock()
    
    def get(self, key: str, default: Any = None) -> Any:
        """Безопасно получить значение"""
        with self._lock:
            return self._dict.get(key, default)
    
    def set(self, key: str, value: Any) -> None:
        """Безопасно установить значение"""
        with self._lock:
            self._dict[key] = value
    
    def delete(self, key: str) -> None:
        """Безопасно удалить значение"""
        with self._lock:
            if key in self._dict:
                del self._dict[key]
    
    def items(self):
        """Безопасно получить все элементы"""
        with self._lock:
            return list(self._dict.items())
    
    def __contains__(self, key: str) -> bool:
        """Проверить наличие ключа"""
        with self._lock:
            return key in self._dict
    
    def __len__(self) -> int:
        """Получить размер"""
        with self._lock:
            return len(self._dict)

# Использование
safe_dict = ThreadSafeDict()
safe_dict.set("name", "Alice")
print(safe_dict.get("name"))  # Alice

Плюсы:

  • Простой и понятный код
  • Полный контроль
  • Работает везде

Минусы:

  • Нужно вручную оборачивать все операции
  • Может быть медленным при частых блокировках
  • Не поддерживает обычный dict интерфейс

2. threading.RLock — для рекурсивных операций

Eсли одному потоку нужно несколько раз захватить блокировку:

import threading

class RecursiveThreadSafeDict:
    def __init__(self):
        self._dict = {}
        self._lock = threading.RLock()  # Рекурсивный мьютекс
    
    def set(self, key, value):
        with self._lock:
            self._dict[key] = value
    
    def get(self, key, default=None):
        with self._lock:
            return self._dict.get(key, default)
    
    def update_if_exists(self, key, new_value):
        """Метод, который захватывает блокировку"""
        with self._lock:
            if key in self._dict:  # Внутренняя операция
                # Вызвать set, который тоже захватывает блокировку
                # С RLock это работает, с Lock вызовет deadlock
                self.set(key, new_value)

3. threading.RWLock (читать-писать блокировка)

Для случаев, когда много читателей, но мало писателей:

import threading

class ReadWriteLock:
    def __init__(self):
        self._read_ready = threading.Condition(
            threading.RLock()
        )
        self._readers = 0
    
    def acquire_read(self):
        """Захватить блокировку для чтения"""
        self._read_ready.acquire()
        self._readers += 1
        self._read_ready.release()
    
    def release_read(self):
        """Освободить блокировку для чтения"""
        self._read_ready.acquire()
        self._readers -= 1
        self._read_ready.notify_all()
        self._read_ready.release()
    
    def acquire_write(self):
        """Захватить блокировку для письма"""
        self._read_ready.acquire()
        while self._readers > 0:
            self._read_ready.wait()
    
    def release_write(self):
        """Освободить блокировку для письма"""
        self._read_ready.notify_all()
        self._read_ready.release()

class ReadWriteDict:
    def __init__(self):
        self._dict = {}
        self._lock = ReadWriteLock()
    
    def get(self, key, default=None):
        """Неблокирующее чтение для множественных читателей"""
        self._lock.acquire_read()
        try:
            return self._dict.get(key, default)
        finally:
            self._lock.release_read()
    
    def set(self, key, value):
        """Исключительная запись"""
        self._lock.acquire_write()
        try:
            self._dict[key] = value
        finally:
            self._lock.release_write()

4. queue.Queue — для очереди операций

Безопасно работать через очередь:

import queue
import threading

class QueueBasedDict:
    def __init__(self):
        self._dict = {}
        self._queue = queue.Queue()
        self._thread = threading.Thread(target=self._worker, daemon=True)
        self._thread.start()
    
    def _worker(self):
        """Рабочий поток, выполняющий операции"""
        while True:
            operation, args, result_queue = self._queue.get()
            
            if operation == "get":
                key, default = args
                result = self._dict.get(key, default)
            elif operation == "set":
                key, value = args
                self._dict[key] = value
                result = None
            elif operation == "delete":
                key = args
                if key in self._dict:
                    del self._dict[key]
                result = None
            
            if result_queue:
                result_queue.put(result)
    
    def get(self, key, default=None):
        """Получить значение"""
        result_queue = queue.Queue()
        self._queue.put(("get", (key, default), result_queue))
        return result_queue.get()
    
    def set(self, key, value):
        """Установить значение"""
        self._queue.put(("set", (key, value), None))
    
    def delete(self, key):
        """Удалить значение"""
        self._queue.put(("delete", key, None))

5. Встроенный thread-safe queue.Queue

Для простых случаев можно использовать встроенный queue:

import queue
import threading

# queue.Queue уже потокобезопасен
thread_safe_queue = queue.Queue()

def producer():
    for i in range(10):
        thread_safe_queue.put({"id": i, "value": i * 2})

def consumer():
    while True:
        item = thread_safe_queue.get()
        print(item)
        thread_safe_queue.task_done()

# Создаём потоки
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer, daemon=True)

p.start()
c.start()
p.join()

6. collections.OrderedDict с блокировкой

Для сохранения порядка элементов:

import threading
from collections import OrderedDict

class ThreadSafeOrderedDict:
    def __init__(self):
        self._dict = OrderedDict()
        self._lock = threading.Lock()
    
    def set(self, key, value):
        with self._lock:
            self._dict[key] = value
    
    def get(self, key, default=None):
        with self._lock:
            return self._dict.get(key, default)
    
    def items(self):
        """Получить упорядоченные элементы"""
        with self._lock:
            return list(self._dict.items())

7. Практический пример: кэш с потокобезопасностью

import threading
from functools import wraps
from typing import Callable, Any

class ThreadSafeCache:
    def __init__(self, max_size=100):
        self._cache = {}
        self._lock = threading.Lock()
        self._max_size = max_size
    
    def cached(self, func: Callable) -> Callable:
        """Декоратор для кэширования результатов функций"""
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            # Создаём ключ кэша
            cache_key = (func.__name__, args, tuple(sorted(kwargs.items())))
            
            with self._lock:
                if cache_key in self._cache:
                    return self._cache[cache_key]
            
            # Вычисляем результат (без блокировки)
            result = func(*args, **kwargs)
            
            with self._lock:
                # Проверяем размер перед вставкой
                if len(self._cache) >= self._max_size:
                    # Удаляем старый элемент (FIFO)
                    self._cache.pop(next(iter(self._cache)))
                
                self._cache[cache_key] = result
            
            return result
        
        return wrapper

# Использование
cache = ThreadSafeCache(max_size=10)

@cache.cached
def expensive_operation(n):
    print(f"Вычисляю для {n}")
    return n * n

# Многопоточный тест
import time

threads = []
for i in range(5):
    t = threading.Thread(
        target=lambda n=i: expensive_operation(n)
    )
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("Все готово")

8. Проблемы, которых следует избегать

Deadlock — взаимная блокировка:

# ❌ Плохо — может вызвать deadlock
lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1():
    with lock1:
        time.sleep(0.1)
        with lock2:  # Ждёт, если thread2 имеет lock2
            pass

def thread2():
    with lock2:
        time.sleep(0.1)
        with lock1:  # Ждёт, если thread1 имеет lock1
            pass

# ✅ Хорошо — всегда захватываем блокировки в одном порядке
def thread1():
    with lock1:
        with lock2:
            pass

def thread2():
    with lock1:  # Тот же порядок!
        with lock2:
            pass

Race condition — состояние гонки:

# ❌ Плохо — не атомарная операция
with lock:
    if key not in safe_dict._dict:
        safe_dict.set(key, expensive_computation())

# ✅ Хорошо — вся операция в блокировке
with lock:
    if key not in safe_dict._dict:
        safe_dict._dict[key] = expensive_computation()

Сравнение подходов

ПодходПростотаПроизводительностьУниверсальность
threading.LockВысокаяСредняяВысокая
threading.RLockСредняяСредняяСредняя
RW LockНизкаяВысокая (много читателей)Средняя
queue.QueueСредняяСредняяСредняя
multiprocessingНизкаяВысокаяНизкая

Лучшие практики

  • По умолчанию используй threading.Lock — простой и эффективный
  • Минимизируй время в блокировке — вычисляй значения до захвата
  • Избегай вложенных блокировок — это источник deadlock
  • Используй with для автоматического освобождения — никогда вручную
  • Тестируй многопоточный код тщательно — race conditions трудно поймать
  • Для I/O операций используй async вместо потоков — быстрее и надежнее

Потокобезопасные структуры данных критичны в многопоточных приложениях, особенно в веб-серверах и обработчиках событий.

Как сделать потокобезопасный словарь? | PrepBro