Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Проблемы многопоточности в Python
Мультипоточность — это мощный инструмент, но она вносит сложность. За 10+ лет работы я столкнулся со множеством проблем, расскажу о них и решениях.
1. Race Condition (Состояние гонки)
Несколько потоков одновременно изменяют общее состояние — результат непредсказуем:
import threading
counter = 0
lock = threading.Lock()
def increment_without_lock():
global counter
for _ in range(1000000):
counter += 1 # ОПАСНО: не атомарно
# Запуск двух потоков
t1 = threading.Thread(target=increment_without_lock)
t2 = threading.Thread(target=increment_without_lock)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter) # Ожидаем 2000000, но получим меньше (например 1234567)
Решение — использовать синхронизацию:
def increment_with_lock():
global counter
for _ in range(1000000):
with lock:
counter += 1 # Атомарно
# Теперь результат всегда 2000000
2. Deadlock (Взаимная блокировка)
Потоки ждут друг друга и зависают навечно:
import threading
import time
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1_func():
with lock1:
print("T1: захватил lock1")
time.sleep(0.5) # Даём T2 время захватить lock2
with lock2: # DEADLOCK: T2 уже захватил lock2!
print("T1: захватил lock2")
def thread2_func():
with lock2:
print("T2: захватил lock2")
time.sleep(0.5)
with lock1: # DEADLOCK: T1 уже захватил lock1!
print("T2: захватил lock1")
t1 = threading.Thread(target=thread1_func)
t2 = threading.Thread(target=thread2_func)
t1.start()
t2.start()
t1.join() # Зависнет здесь навечно
t2.join()
Решение — всегда захватывать блокировки в одинаковом порядке:
def thread1_func_safe():
with lock1:
with lock2: # Захватываем сначала lock1, потом lock2
print("T1: работаем")
def thread2_func_safe():
with lock1: # Тот же порядок
with lock2:
print("T2: работаем")
3. GIL (Global Interpreter Lock)
В CPython только один поток выполняет Python код одновременно. CPU-bound задачи НЕ масштабируются:
import threading
import time
def cpu_bound_task(n):
"""Дорогостоящее вычисление"""
result = 0
for i in range(n):
result += i ** 2
return result
start = time.time()
# Последовательно (1 поток)
result = cpu_bound_task(100000000)
result = cpu_bound_task(100000000)
sequential_time = time.time() - start
print(f"Последовательно: {sequential_time:.2f}s")
# С потоками (2 потока)
start = time.time()
t1 = threading.Thread(target=cpu_bound_task, args=(100000000,))
t2 = threading.Thread(target=cpu_bound_task, args=(100000000,))
t1.start()
t2.start()
t1.join()
t2.join()
threaded_time = time.time() - start
print(f"С потоками: {threaded_time:.2f}s") # Почти такой же, не быстрее!
Решение — использовать multiprocessing для CPU-bound:
from multiprocessing import Pool
with Pool(processes=2) as pool:
results = pool.map(cpu_bound_task, [100000000, 100000000])
4. Starvation (Голодание потока)
Один поток монополизирует ресурс, другие ждут:
import threading
lock = threading.Lock()
def greedy_thread():
for _ in range(1000):
with lock:
print("Greedy работает")
# Долгая операция
def starving_thread():
for _ in range(10):
with lock:
print("Starving работает (редко!)")
t1 = threading.Thread(target=greedy_thread)
t2 = threading.Thread(target=starving_thread)
t1.start()
t2.start()
Решение — уменьшить время в критической секции:
def fair_thread():
for _ in range(1000):
with lock:
# Только самое необходимое
data = shared_list.pop(0)
# Дорогостоящая работа БЕЗ блокировки
process(data)
5. Memory Visibility (Видимость памяти)
Один поток изменил переменную, другой не видит изменения (компилятор оптимизировал):
import threading
import time
flag = False
def writer():
global flag
time.sleep(1)
flag = True # Пишем
print("Writer: set flag = True")
def reader():
while not flag: # INFINITE LOOP!
pass
print("Reader: flag is True")
t1 = threading.Thread(target=writer)
t2 = threading.Thread(target=reader)
t1.start()
t2.start()
Решение — использовать синхронизацию (она гарантирует memory visibility):
flag = False
lock = threading.Lock()
def writer():
global flag
time.sleep(1)
with lock:
flag = True
def reader():
while True:
with lock:
if flag:
break
print("Reader: flag is True")
6. Livelock (Живая блокировка)
Потоки активны, но не делают полезную работу (постоянно конкурируют):
import threading
import time
lock = threading.Lock()
def thread_func():
while True:
if lock.acquire(blocking=False): # try_lock
try:
print(f"{threading.current_thread().name}: работаю")
time.sleep(0.01)
finally:
lock.release()
else:
print(f"{threading.current_thread().name}: не захватил, попробую позже")
time.sleep(0.001) # Много попыток, CPU горит
for i in range(2):
t = threading.Thread(target=thread_func, daemon=True)
t.start()
time.sleep(2)
Решение — использовать условные переменные вместо busy-waiting:
condition = threading.Condition()
def thread_func_safe():
with condition:
while not can_work(): # Проверка
condition.wait() # Спит, не горит CPU
# работаем
condition.notify_all() # Разбуди других
7. Data Corruption (Повреждение данных)
Сложные структуры данных при одновременном доступе рискуют быть повреждены:
import threading
class BankAccount:
def __init__(self):
self.balance = 1000
self.lock = threading.Lock()
def withdraw(self, amount):
# ОПАСНО без блокировки
if self.balance >= amount:
self.balance -= amount
def withdraw_safe(self, amount):
# Безопасно с блокировкой
with self.lock:
if self.balance >= amount:
self.balance -= amount
return True
return False
Best Practices
- Минимизируй критические секции — блокируй только необходимое
- Избегай вложенных блокировок — если нужны, всегда в одном порядке
- Используй asyncio для I/O — потоки не нужны для I/O
- Используй multiprocessing для CPU — обходит GIL
- Immutable структуры — избежишь race conditions
- Принцип KISS — простая однопоточная архитектура лучше, чем сложная многопоточная
В production я избегаю явного threading, предпочитаю asyncio или микросервисы с очередями (Celery, RabbitMQ).