Какие особенности многопоточности в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Особенности многопоточности в Python
Многопоточность в Python — это одна из самых мальпонимаемых тем. Каждый разработчик, работавший с Python, встречал неожиданные проблемы. Вот что я знаю из опыта проектирования высоконагруженных систем.
1. Global Interpreter Lock (GIL) — главная особенность
В CPython существует глобальная блокировка, которая предотвращает истинный параллелизм:
import threading
import time
def cpu_work():
"""CPU-bound операция"""
count = 0
for i in range(100_000_000):
count += i
return count
# Однопоточное выполнение
start = time.time()
result1 = cpu_work()
result2 = cpu_work()
print(f"Однопоточно: {time.time() - start:.2f}s") # ~8s
# Попытка многопоточности
start = time.time()
t1 = threading.Thread(target=cpu_work)
t2 = threading.Thread(target=cpu_work)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Многопоточно: {time.time() - start:.2f}s") # ~16s (медленнее!)
# GIL не позволяет обеим потокам работать одновременно
# Каждый поток должен дождаться, пока другой отпустит GIL
Почему GIL существует? Потому что управление памятью в CPython использует подсчёт ссылок (reference counting), а не сборку мусора. Без GIL каждое изменение требовало бы глобальную синхронизацию.
2. I/O-bound операции НЕ ограничены GIL
Для операций ввода-вывода многопоточность работает отлично:
import threading
import time
import requests
def fetch_url(url):
"""I/O-bound операция"""
response = requests.get(url, timeout=5)
return len(response.content)
urls = ['https://httpbin.org/delay/2'] * 10
# Однопоточно
start = time.time()
for url in urls:
fetch_url(url)
print(f"Однопоточно: {time.time() - start:.2f}s") # ~20s
# Многопоточно
start = time.time()
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Многопоточно: {time.time() - start:.2f}s") # ~2s (в 10 раз быстрее!)
# Пока один поток ждёт ответа, другой может работать
# GIL отпускается при I/O операциях
3. Threading vs Multiprocessing
При CPU-bound нужен multiprocessing:
from multiprocessing import Pool
import threading
import time
def cpu_task(n):
"""Heavy CPU calculation"""
return sum(i*i for i in range(n))
task_data = [100_000_000] * 4
# Threading — медленно
start = time.time()
threads = [threading.Thread(target=cpu_task, args=(n,)) for n in task_data]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Threading: {time.time() - start:.2f}s") # ~25s
# Multiprocessing — быстро
start = time.time()
with Pool(4) as p:
results = p.map(cpu_task, task_data)
print(f"Multiprocessing: {time.time() - start:.2f}s") # ~7s (в 3.5 раза быстрее)
# Каждый процесс имеет свой GIL
4. GIL отпускается при встроенных операциях
При вызове C-кода GIL может быть отпущен:
import threading
import time
import math
def compute_math():
"""Встроенные математические функции не требуют GIL"""
for _ in range(100_000_000):
math.sqrt(2)
# Однопоточно
start = time.time()
compute_math()
compute_math()
print(f"Однопоточно: {time.time() - start:.2f}s") # ~5s
# Многопоточно
start = time.time()
t1 = threading.Thread(target=compute_math)
t2 = threading.Thread(target=compute_math)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Многопоточно: {time.time() - start:.2f}s") # ~3s (быстрее!)
# math.sqrt() вызывает C-код, при котором GIL отпускается
5. Race Conditions — основная опасность
Многопоточность создаёт проблемы с конкурентным доступом:
import threading
class Counter:
def __init__(self):
self.value = 0
counter = Counter()
def increment():
"""Выглядит просто, но это 3 операции:
1. Прочитать значение
2. Добавить 1
3. Записать значение
"""
for _ in range(1_000_000):
# ❌ Race condition
counter.value += 1
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Expected: 4_000_000, Got: {counter.value}")
# Результат: 1_234_567 (случайное число, не 4 млн!)
# ✅ Решение — использовать Lock
from threading import Lock
class SafeCounter:
def __init__(self):
self.value = 0
self._lock = Lock()
def increment(self):
with self._lock:
self.value += 1
counter = SafeCounter()
threads = [threading.Thread(target=lambda: [counter.increment() for _ in range(1_000_000)]) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Safe: {counter.value}") # 4_000_000
6. Daemon потоки — фоновые потоки
import threading
import time
def daemon_worker():
"""Фоновый рабочий процесс"""
for i in range(100):
time.sleep(0.1)
print(f"Daemon: {i}")
def main_worker():
"""Основной процесс"""
time.sleep(1)
print("Main finished")
# Daemon поток
t_daemon = threading.Thread(target=daemon_worker, daemon=True)
t_daemon.start()
# Основной поток
t_main = threading.Thread(target=main_worker)
t_main.start()
t_main.join()
# Программа завершится, когда закончатся основные потоки
# Daemon потоки будут прерваны
print("Program exit")
7. Thread Local Storage — данные для каждого потока
import threading
thread_local = threading.local()
def worker(user_id):
# Каждый поток имеет собственный thread_local
thread_local.user_id = user_id
thread_local.db_connection = f"conn_{user_id}"
# Другие потоки не видят эти значения
print(f"Thread {threading.current_thread().name}: user={thread_local.user_id}")
# Используем соединение
query_result = f"SELECT FROM {thread_local.db_connection}"
print(query_result)
threads = [threading.Thread(target=worker, args=(i,), name=f"Worker-{i}") for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
8. Queue — потокобезопасная передача данных
from queue import Queue
import threading
import time
work_queue = Queue(maxsize=100)
results = Queue()
errors = 0
def producer():
"""Производитель — добавляет задачи"""
for i in range(100):
work_queue.put(f"task_{i}")
time.sleep(0.01)
def consumer(worker_id):
"""Потребитель — обрабатывает задачи"""
while True:
try:
task = work_queue.get(timeout=1)
if task is None:
break
# Обработка
result = f"result_{task}"
results.put((worker_id, result))
work_queue.task_done()
except:
break
# Создаём потоки
p_thread = threading.Thread(target=producer)
c_threads = [threading.Thread(target=consumer, args=(i,)) for i in range(4)]
p_thread.start()
for t in c_threads:
t.start()
p_thread.join()
work_queue.join() # Ждём, пока все задачи обработаны
for _ in range(4):
work_queue.put(None) # Stop signal
for t in c_threads:
t.join()
print(f"Results: {results.qsize()}")
9. Asyncio вместо Threading
Для I/O-bound операций asyncio часто лучше:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['https://httpbin.org/delay/2'] * 10
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Все 10 запросов параллельны в одном потоке
results = asyncio.run(main())
print(f"Fetched {len(results)} URLs")
# Asyncio:
# - Один поток (нет context switching)
# - Выше throughput
# - Проще отладить
# - Менее голодный по памяти
10. Python 3.13+ — удаление GIL в экспериментальном режиме
# Python 3.13 может запуститься БЕЗ GIL:
# python -X gil=0 your_script.py
# Но это НЕ решает все проблемы потокобезопасности
# Race conditions всё равно существуют!
import threading
counter = 0
def increment():
global counter
for _ in range(1_000_000):
counter += 1 # Всё ещё race condition!
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Может быть не 4_000_000 даже без GIL!
Рекомендации для многопоточности в Python
- CPU-bound → multiprocessing — обходим GIL
- I/O-bound → asyncio — эффективнее threading
- Threading только для блокирующего I/O — файлы, сеть, БД
- Всегда используйте Lock — для доступа к общему состоянию
- Queue вместо прямого доступа — безопаснее
- Thread-local для контекстных данных — пользователь, соединение
- Профилируйте перед оптимизацией — где реальное узкое место
- Избегайте вложенных Lock'ов — риск deadlock
- Тестируйте многопоточный код — проблемы проявляются редко
- Документируйте потокобезопасность — явно указывайте
Многопоточность в Python — это мощный инструмент, но требует уважения. Используйте asyncio по умолчанию для I/O, multiprocessing для CPU, и threading только в крайнем случае для блокирующего I/O, который не может быть переведён на async.