← Назад к вопросам
Как реализовать многопоточность?
2.0 Middle🔥 171 комментариев
#Python Core#Soft Skills
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Многопоточность в Python
Многопоточность — это техника для параллельного выполнения кода. В Python это сложнее, чем в других языках, из-за GIL (Global Interpreter Lock), но она остаётся полезной для I/O операций. Расскажу о всех подходах.
Понимание GIL (Global Interpreter Lock)
GIL — это мьютекс, который не позволяет нескольким потокам одновременно выполнять Python код. Это означает, что многопоточность не даёт ускорение для CPU-bound задач, но помогает для I/O-bound.
import time
import threading
def cpu_bound_task():
"""CPU-intensive операция"""
total = 0
for i in range(100_000_000):
total += i
return total
# ОДНОПОТОК: 5 секунд
start = time.time()
cpu_bound_task()
cpu_bound_task()
print(f"Single thread: {time.time() - start:.2f}s") # ~5s
# МНОГОПОТОК: всё ещё ~5 секунд из-за GIL
start = time.time()
t1 = threading.Thread(target=cpu_bound_task)
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Multi thread: {time.time() - start:.2f}s") # ~5s (GIL!)
Когда использовать многопоточность
import time
import threading
import requests
def io_bound_task(url):
"""I/O операция (блокирует поток, но освобождает GIL)"""
response = requests.get(url, timeout=5)
return response.status_code
# ОДНОПОТОК: 10 секунд (10 запросов * 1 сек)
start = time.time()
for i in range(10):
io_bound_task("https://example.com")
print(f"Single thread: {time.time() - start:.2f}s") # ~10s
# МНОГОПОТОК: ~1 секунда (все параллельно)
start = time.time()
threads = []
for i in range(10):
t = threading.Thread(target=io_bound_task, args=("https://example.com",))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Multi thread: {time.time() - start:.2f}s") # ~1s
threading.Thread (базовый API)
import threading
import time
class DownloadThread(threading.Thread):
def __init__(self, url, filename):
super().__init__()
self.url = url
self.filename = filename
def run(self):
"""Выполняется в отдельном потоке"""
print(f"Downloading {self.url}...")
# Реальная загрузка
time.sleep(1)
print(f"Saved to {self.filename}")
# Использование
thread = DownloadThread("https://example.com/file.zip", "file.zip")
thread.start() # Запустить поток
thread.join() # Ждём завершения
print("Done!")
ThreadPoolExecutor (рекомендуется)
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def download_file(url):
response = requests.get(url, timeout=5)
return f"{url}: {response.status_code}"
urls = [
"https://example.com",
"https://example.com/page1",
"https://example.com/page2",
]
# Способ 1: with statement (автоматически закрывает)
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
# submit() добавляет задачу в очередь
futures = [executor.submit(download_file, url) for url in urls]
# as_completed() возвращает результаты по мере готовности
for future in as_completed(futures):
try:
result = future.result(timeout=10) # Может выбросить исключение
print(result)
except Exception as e:
print(f"Error: {e}")
print(f"Total time: {time.time() - start:.2f}s")
# Способ 2: map() для простых случаев
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(download_file, urls, timeout=10)
for result in results:
print(result)
Lock (синхронизация доступа)
import threading
class BankAccount:
def __init__(self, balance: float):
self.balance = balance
self.lock = threading.Lock() # Защита от race conditions
def withdraw(self, amount: float):
with self.lock: # Захватываем блокировку
if self.balance >= amount:
self.balance -= amount
return True
return False
def deposit(self, amount: float):
with self.lock:
self.balance += amount
account = BankAccount(1000)
def withdraw_repeatedly():
for _ in range(100):
account.withdraw(10)
# Без Lock может быть race condition
threads = [threading.Thread(target=withdraw_repeatedly) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Final balance: {account.balance}") # 0 (100 * 10 * 10)
RLock (переиспользуемая блокировка)
import threading
class ReentrantExample:
def __init__(self):
self.rlock = threading.RLock() # Один поток может захватить несколько раз
def method_a(self):
with self.rlock:
print("In method_a")
self.method_b()
def method_b(self):
with self.rlock: # Тот же поток может захватить снова
print("In method_b")
example = ReentrantExample()
example.method_a() # Работает благодаря RLock
Event (сигнализация между потоками)
import threading
import time
class DataProcessor:
def __init__(self):
self.ready = threading.Event() # Флаг готовности
self.data = None
def producer(self):
time.sleep(2) # Долгая операция
self.data = [1, 2, 3, 4, 5]
self.ready.set() # Сигнализируем, что данные готовы
def consumer(self):
self.ready.wait() # Ждём сигнала
print(f"Processing: {self.data}")
processor = DataProcessor()
t1 = threading.Thread(target=processor.producer)
t2 = threading.Thread(target=processor.consumer)
t1.start()
t2.start()
t1.join()
t2.join()
Queue (безопасная очередь между потоками)
import queue
import threading
import time
def producer(q: queue.Queue):
for i in range(5):
item = f"Item {i}"
q.put(item) # Добавить в очередь
print(f"Produced: {item}")
time.sleep(0.5)
def consumer(q: queue.Queue):
while True:
try:
item = q.get(timeout=2) # Получить из очереди
if item is None: # Сигнал завершения
break
print(f"Consumed: {item}")
q.task_done() # Отметить как обработано
except queue.Empty:
break
q = queue.Queue(maxsize=3) # Очередь с максимум 3 элементами
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))
t1.start()
t2.start()
t1.join()
q.put(None) # Сигнал завершения
t2.join()
Daemon потоки
import threading
import time
def daemon_task():
while True:
print("Daemon running...")
time.sleep(1)
# Daemon поток завершится, когда основной поток завершится
daemon = threading.Thread(target=daemon_task, daemon=True)
daemon.start()
time.sleep(3)
print("Main thread finished")
# Daemon автоматически завершится
Проблемы и Best Practices
# ПРОБЛЕМА: Race condition
shared_counter = 0
def increment():
global shared_counter
for _ in range(1_000_000):
shared_counter += 1 # NOT THREAD-SAFE
# Результат: меньше 2_000_000 из-за race condition
# РЕШЕНИЕ: используй Lock
import threading
shared_counter = 0
lock = threading.Lock()
def increment_safe():
global shared_counter
for _ in range(1_000_000):
with lock:
shared_counter += 1
# Результат: ровно 2_000_000
Когда NOT использовать многопоточность
- CPU-bound операции — используй multiprocessing
- Асинхронный код — используй asyncio
- Микросервисы — лучше использовать очереди (RabbitMQ, Redis)
Summary
- Используй threading для I/O-bound операций
- Используй ThreadPoolExecutor вместо Thread напрямую
- Защищай общие данные Lock или Queue
- Помни про GIL — не поможет для CPU-bound
- Предпочитай asyncio для большого количества параллельных I/O операций