Как сделать потокобезопасный словарь?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как сделать потокобезопасный словарь
Обычный Python словарь (dict) не является потокобезопасным. При одновременном доступе из нескольких потоков может произойти повреждение данных. Существует несколько подходов для создания потокобезопасного словаря.
1. threading.Lock — базовый подход
Оберните обычный dict в класс с использованием мьютекса:
import threading
from typing import Any, Optional
class ThreadSafeDict:
def __init__(self):
self._dict = {}
self._lock = threading.Lock()
def get(self, key: str, default: Any = None) -> Any:
"""Безопасно получить значение"""
with self._lock:
return self._dict.get(key, default)
def set(self, key: str, value: Any) -> None:
"""Безопасно установить значение"""
with self._lock:
self._dict[key] = value
def delete(self, key: str) -> None:
"""Безопасно удалить значение"""
with self._lock:
if key in self._dict:
del self._dict[key]
def items(self):
"""Безопасно получить все элементы"""
with self._lock:
return list(self._dict.items())
def __contains__(self, key: str) -> bool:
"""Проверить наличие ключа"""
with self._lock:
return key in self._dict
def __len__(self) -> int:
"""Получить размер"""
with self._lock:
return len(self._dict)
# Использование
safe_dict = ThreadSafeDict()
safe_dict.set("name", "Alice")
print(safe_dict.get("name")) # Alice
Плюсы:
- Простой и понятный код
- Полный контроль
- Работает везде
Минусы:
- Нужно вручную оборачивать все операции
- Может быть медленным при частых блокировках
- Не поддерживает обычный dict интерфейс
2. threading.RLock — для рекурсивных операций
Eсли одному потоку нужно несколько раз захватить блокировку:
import threading
class RecursiveThreadSafeDict:
def __init__(self):
self._dict = {}
self._lock = threading.RLock() # Рекурсивный мьютекс
def set(self, key, value):
with self._lock:
self._dict[key] = value
def get(self, key, default=None):
with self._lock:
return self._dict.get(key, default)
def update_if_exists(self, key, new_value):
"""Метод, который захватывает блокировку"""
with self._lock:
if key in self._dict: # Внутренняя операция
# Вызвать set, который тоже захватывает блокировку
# С RLock это работает, с Lock вызовет deadlock
self.set(key, new_value)
3. threading.RWLock (читать-писать блокировка)
Для случаев, когда много читателей, но мало писателей:
import threading
class ReadWriteLock:
def __init__(self):
self._read_ready = threading.Condition(
threading.RLock()
)
self._readers = 0
def acquire_read(self):
"""Захватить блокировку для чтения"""
self._read_ready.acquire()
self._readers += 1
self._read_ready.release()
def release_read(self):
"""Освободить блокировку для чтения"""
self._read_ready.acquire()
self._readers -= 1
self._read_ready.notify_all()
self._read_ready.release()
def acquire_write(self):
"""Захватить блокировку для письма"""
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
"""Освободить блокировку для письма"""
self._read_ready.notify_all()
self._read_ready.release()
class ReadWriteDict:
def __init__(self):
self._dict = {}
self._lock = ReadWriteLock()
def get(self, key, default=None):
"""Неблокирующее чтение для множественных читателей"""
self._lock.acquire_read()
try:
return self._dict.get(key, default)
finally:
self._lock.release_read()
def set(self, key, value):
"""Исключительная запись"""
self._lock.acquire_write()
try:
self._dict[key] = value
finally:
self._lock.release_write()
4. queue.Queue — для очереди операций
Безопасно работать через очередь:
import queue
import threading
class QueueBasedDict:
def __init__(self):
self._dict = {}
self._queue = queue.Queue()
self._thread = threading.Thread(target=self._worker, daemon=True)
self._thread.start()
def _worker(self):
"""Рабочий поток, выполняющий операции"""
while True:
operation, args, result_queue = self._queue.get()
if operation == "get":
key, default = args
result = self._dict.get(key, default)
elif operation == "set":
key, value = args
self._dict[key] = value
result = None
elif operation == "delete":
key = args
if key in self._dict:
del self._dict[key]
result = None
if result_queue:
result_queue.put(result)
def get(self, key, default=None):
"""Получить значение"""
result_queue = queue.Queue()
self._queue.put(("get", (key, default), result_queue))
return result_queue.get()
def set(self, key, value):
"""Установить значение"""
self._queue.put(("set", (key, value), None))
def delete(self, key):
"""Удалить значение"""
self._queue.put(("delete", key, None))
5. Встроенный thread-safe queue.Queue
Для простых случаев можно использовать встроенный queue:
import queue
import threading
# queue.Queue уже потокобезопасен
thread_safe_queue = queue.Queue()
def producer():
for i in range(10):
thread_safe_queue.put({"id": i, "value": i * 2})
def consumer():
while True:
item = thread_safe_queue.get()
print(item)
thread_safe_queue.task_done()
# Создаём потоки
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer, daemon=True)
p.start()
c.start()
p.join()
6. collections.OrderedDict с блокировкой
Для сохранения порядка элементов:
import threading
from collections import OrderedDict
class ThreadSafeOrderedDict:
def __init__(self):
self._dict = OrderedDict()
self._lock = threading.Lock()
def set(self, key, value):
with self._lock:
self._dict[key] = value
def get(self, key, default=None):
with self._lock:
return self._dict.get(key, default)
def items(self):
"""Получить упорядоченные элементы"""
with self._lock:
return list(self._dict.items())
7. Практический пример: кэш с потокобезопасностью
import threading
from functools import wraps
from typing import Callable, Any
class ThreadSafeCache:
def __init__(self, max_size=100):
self._cache = {}
self._lock = threading.Lock()
self._max_size = max_size
def cached(self, func: Callable) -> Callable:
"""Декоратор для кэширования результатов функций"""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Создаём ключ кэша
cache_key = (func.__name__, args, tuple(sorted(kwargs.items())))
with self._lock:
if cache_key in self._cache:
return self._cache[cache_key]
# Вычисляем результат (без блокировки)
result = func(*args, **kwargs)
with self._lock:
# Проверяем размер перед вставкой
if len(self._cache) >= self._max_size:
# Удаляем старый элемент (FIFO)
self._cache.pop(next(iter(self._cache)))
self._cache[cache_key] = result
return result
return wrapper
# Использование
cache = ThreadSafeCache(max_size=10)
@cache.cached
def expensive_operation(n):
print(f"Вычисляю для {n}")
return n * n
# Многопоточный тест
import time
threads = []
for i in range(5):
t = threading.Thread(
target=lambda n=i: expensive_operation(n)
)
threads.append(t)
t.start()
for t in threads:
t.join()
print("Все готово")
8. Проблемы, которых следует избегать
Deadlock — взаимная блокировка:
# ❌ Плохо — может вызвать deadlock
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1():
with lock1:
time.sleep(0.1)
with lock2: # Ждёт, если thread2 имеет lock2
pass
def thread2():
with lock2:
time.sleep(0.1)
with lock1: # Ждёт, если thread1 имеет lock1
pass
# ✅ Хорошо — всегда захватываем блокировки в одном порядке
def thread1():
with lock1:
with lock2:
pass
def thread2():
with lock1: # Тот же порядок!
with lock2:
pass
Race condition — состояние гонки:
# ❌ Плохо — не атомарная операция
with lock:
if key not in safe_dict._dict:
safe_dict.set(key, expensive_computation())
# ✅ Хорошо — вся операция в блокировке
with lock:
if key not in safe_dict._dict:
safe_dict._dict[key] = expensive_computation()
Сравнение подходов
| Подход | Простота | Производительность | Универсальность |
|---|---|---|---|
| threading.Lock | Высокая | Средняя | Высокая |
| threading.RLock | Средняя | Средняя | Средняя |
| RW Lock | Низкая | Высокая (много читателей) | Средняя |
| queue.Queue | Средняя | Средняя | Средняя |
| multiprocessing | Низкая | Высокая | Низкая |
Лучшие практики
- По умолчанию используй threading.Lock — простой и эффективный
- Минимизируй время в блокировке — вычисляй значения до захвата
- Избегай вложенных блокировок — это источник deadlock
- Используй
withдля автоматического освобождения — никогда вручную - Тестируй многопоточный код тщательно — race conditions трудно поймать
- Для I/O операций используй async вместо потоков — быстрее и надежнее
Потокобезопасные структуры данных критичны в многопоточных приложениях, особенно в веб-серверах и обработчиках событий.