← Назад к вопросам

Как реализовать многопоточность?

2.0 Middle🔥 171 комментариев
#Python Core#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Многопоточность в Python

Многопоточность — это техника для параллельного выполнения кода. В Python это сложнее, чем в других языках, из-за GIL (Global Interpreter Lock), но она остаётся полезной для I/O операций. Расскажу о всех подходах.

Понимание GIL (Global Interpreter Lock)

GIL — это мьютекс, который не позволяет нескольким потокам одновременно выполнять Python код. Это означает, что многопоточность не даёт ускорение для CPU-bound задач, но помогает для I/O-bound.

import time
import threading

def cpu_bound_task():
    """CPU-intensive операция"""
    total = 0
    for i in range(100_000_000):
        total += i
    return total

# ОДНОПОТОК: 5 секунд
start = time.time()
cpu_bound_task()
cpu_bound_task()
print(f"Single thread: {time.time() - start:.2f}s")  # ~5s

# МНОГОПОТОК: всё ещё ~5 секунд из-за GIL
start = time.time()
t1 = threading.Thread(target=cpu_bound_task)
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Multi thread: {time.time() - start:.2f}s")  # ~5s (GIL!)

Когда использовать многопоточность

import time
import threading
import requests

def io_bound_task(url):
    """I/O операция (блокирует поток, но освобождает GIL)"""
    response = requests.get(url, timeout=5)
    return response.status_code

# ОДНОПОТОК: 10 секунд (10 запросов * 1 сек)
start = time.time()
for i in range(10):
    io_bound_task("https://example.com")
print(f"Single thread: {time.time() - start:.2f}s")  # ~10s

# МНОГОПОТОК: ~1 секунда (все параллельно)
start = time.time()
threads = []
for i in range(10):
    t = threading.Thread(target=io_bound_task, args=("https://example.com",))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Multi thread: {time.time() - start:.2f}s")  # ~1s

threading.Thread (базовый API)

import threading
import time

class DownloadThread(threading.Thread):
    def __init__(self, url, filename):
        super().__init__()
        self.url = url
        self.filename = filename
    
    def run(self):
        """Выполняется в отдельном потоке"""
        print(f"Downloading {self.url}...")
        # Реальная загрузка
        time.sleep(1)
        print(f"Saved to {self.filename}")

# Использование
thread = DownloadThread("https://example.com/file.zip", "file.zip")
thread.start()  # Запустить поток
thread.join()   # Ждём завершения

print("Done!")

ThreadPoolExecutor (рекомендуется)

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def download_file(url):
    response = requests.get(url, timeout=5)
    return f"{url}: {response.status_code}"

urls = [
    "https://example.com",
    "https://example.com/page1",
    "https://example.com/page2",
]

# Способ 1: with statement (автоматически закрывает)
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
    # submit() добавляет задачу в очередь
    futures = [executor.submit(download_file, url) for url in urls]
    
    # as_completed() возвращает результаты по мере готовности
    for future in as_completed(futures):
        try:
            result = future.result(timeout=10)  # Может выбросить исключение
            print(result)
        except Exception as e:
            print(f"Error: {e}")

print(f"Total time: {time.time() - start:.2f}s")

# Способ 2: map() для простых случаев
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(download_file, urls, timeout=10)
    for result in results:
        print(result)

Lock (синхронизация доступа)

import threading

class BankAccount:
    def __init__(self, balance: float):
        self.balance = balance
        self.lock = threading.Lock()  # Защита от race conditions
    
    def withdraw(self, amount: float):
        with self.lock:  # Захватываем блокировку
            if self.balance >= amount:
                self.balance -= amount
                return True
        return False
    
    def deposit(self, amount: float):
        with self.lock:
            self.balance += amount

account = BankAccount(1000)

def withdraw_repeatedly():
    for _ in range(100):
        account.withdraw(10)

# Без Lock может быть race condition
threads = [threading.Thread(target=withdraw_repeatedly) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Final balance: {account.balance}")  # 0 (100 * 10 * 10)

RLock (переиспользуемая блокировка)

import threading

class ReentrantExample:
    def __init__(self):
        self.rlock = threading.RLock()  # Один поток может захватить несколько раз
    
    def method_a(self):
        with self.rlock:
            print("In method_a")
            self.method_b()
    
    def method_b(self):
        with self.rlock:  # Тот же поток может захватить снова
            print("In method_b")

example = ReentrantExample()
example.method_a()  # Работает благодаря RLock

Event (сигнализация между потоками)

import threading
import time

class DataProcessor:
    def __init__(self):
        self.ready = threading.Event()  # Флаг готовности
        self.data = None
    
    def producer(self):
        time.sleep(2)  # Долгая операция
        self.data = [1, 2, 3, 4, 5]
        self.ready.set()  # Сигнализируем, что данные готовы
    
    def consumer(self):
        self.ready.wait()  # Ждём сигнала
        print(f"Processing: {self.data}")

processor = DataProcessor()

t1 = threading.Thread(target=processor.producer)
t2 = threading.Thread(target=processor.consumer)

t1.start()
t2.start()

t1.join()
t2.join()

Queue (безопасная очередь между потоками)

import queue
import threading
import time

def producer(q: queue.Queue):
    for i in range(5):
        item = f"Item {i}"
        q.put(item)  # Добавить в очередь
        print(f"Produced: {item}")
        time.sleep(0.5)

def consumer(q: queue.Queue):
    while True:
        try:
            item = q.get(timeout=2)  # Получить из очереди
            if item is None:  # Сигнал завершения
                break
            print(f"Consumed: {item}")
            q.task_done()  # Отметить как обработано
        except queue.Empty:
            break

q = queue.Queue(maxsize=3)  # Очередь с максимум 3 элементами

t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t2.start()

t1.join()
q.put(None)  # Сигнал завершения
t2.join()

Daemon потоки

import threading
import time

def daemon_task():
    while True:
        print("Daemon running...")
        time.sleep(1)

# Daemon поток завершится, когда основной поток завершится
daemon = threading.Thread(target=daemon_task, daemon=True)
daemon.start()

time.sleep(3)
print("Main thread finished")
# Daemon автоматически завершится

Проблемы и Best Practices

# ПРОБЛЕМА: Race condition
shared_counter = 0

def increment():
    global shared_counter
    for _ in range(1_000_000):
        shared_counter += 1  # NOT THREAD-SAFE

# Результат: меньше 2_000_000 из-за race condition

# РЕШЕНИЕ: используй Lock
import threading

shared_counter = 0
lock = threading.Lock()

def increment_safe():
    global shared_counter
    for _ in range(1_000_000):
        with lock:
            shared_counter += 1

# Результат: ровно 2_000_000

Когда NOT использовать многопоточность

  • CPU-bound операции — используй multiprocessing
  • Асинхронный код — используй asyncio
  • Микросервисы — лучше использовать очереди (RabbitMQ, Redis)

Summary

  • Используй threading для I/O-bound операций
  • Используй ThreadPoolExecutor вместо Thread напрямую
  • Защищай общие данные Lock или Queue
  • Помни про GIL — не поможет для CPU-bound
  • Предпочитай asyncio для большого количества параллельных I/O операций