← Назад к вопросам

Что такое распараллеривание потоков?

1.7 Middle🔥 101 комментариев
#DevOps и инфраструктура#Django

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Распараллеривание потоков (Thread Parallelization)

Распараллеривание потоков — это техника, позволяющая программе одновременно выполнять несколько задач в рамках одного процесса, используя несколько потоков (threads). Каждый поток имеет свой стек вызовов и счётчик команд, но все потоки одного процесса работают с общей памятью.

Различие: потоки vs процессы

  • Потоки (Threads): лёгкие, быстрое переключение, общая память
  • Процессы (Processes): тяжёлые, изолированная память, медленнее

Основные концепции

1. Создание потоков

import threading
import time
from typing import Any

def worker(name: str, delay: int) -> None:
    """Функция для выполнения в отдельном потоке"""
    for i in range(3):
        time.sleep(delay)
        print(f"{name}: итерация {i}")

# Способ 1: Простой поток
thread1 = threading.Thread(target=worker, args=("Поток-1", 1))
thread2 = threading.Thread(target=worker, args=("Поток-2", 0.5))

# Запуск потоков
thread1.start()
thread2.start()

# Ожидание завершения
thread1.join()
thread2.join()

print("Все потоки завершены")

2. Класс-подкласс Thread

class MyThread(threading.Thread):
    def __init__(self, name: str, count: int):
        super().__init__()
        self.name = name
        self.count = count
    
    def run(self) -> None:
        """Переопределяем метод run"""
        for i in range(self.count):
            print(f"{self.name}: {i}")
            time.sleep(0.1)

thread = MyThread("CustomThread", 5)
thread.start()
thread.join()

3. Демон-потоки

Демон-потоки автоматически завершаются при выходе основной программы:

def background_task():
    while True:
        print("Фоновая задача работает...")
        time.sleep(1)

thread = threading.Thread(target=background_task, daemon=True)
thread.start()

time.sleep(3)
print("Программа завершается")
# Демон-поток автоматически остановится

Синхронизация потоков

1. Lock (Мьютекс)

Предотвращает одновременный доступ нескольких потоков к ресурсу:

lock = threading.Lock()
counter = 0

def increment():
    global counter
    for _ in range(100000):
        with lock:  # Критическая секция
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Counter: {counter}")  # 500000 (корректно)

2. RLock (Рекурсивная блокировка)

Позволяет одному потоку много раз получить одну блокировку:

rlock = threading.RLock()

def function_a():
    with rlock:
        print("A: критическая секция")
        function_b()

def function_b():
    with rlock:  # Один поток может получить RLock множество раз
        print("B: критическая секция")

thread = threading.Thread(target=function_a)
thread.start()
thread.join()

3. Semaphore (Семафор)

Ограничивает количество потоков, одновременно заходящих в ресурс:

sem = threading.Semaphore(2)  # Максимум 2 потока одновременно

def access_resource(id: int):
    with sem:
        print(f"Поток {id} использует ресурс")
        time.sleep(1)
        print(f"Поток {id} освобождает ресурс")

threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(5)]

for t in threads:
    t.start()
for t in threads:
    t.join()

4. Event (Событие)

Один поток может сигнализировать, а другие ждать события:

event = threading.Event()

def waiter(name: str):
    print(f"{name}: ожидает событие")
    event.wait()  # Блокируется до set()
    print(f"{name}: событие произошло!")

def setter():
    time.sleep(2)
    print("Setter: генерирует событие")
    event.set()

threads = [threading.Thread(target=waiter, args=(f"Waiter-{i}",)) for i in range(3)]
threads.append(threading.Thread(target=setter))

for t in threads:
    t.start()
for t in threads:
    t.join()

5. Condition (Условная переменная)

Для координации нескольких потоков на основе условия:

condition = threading.Condition()
resource = []

def producer():
    for i in range(5):
        with condition:
            resource.append(i)
            print(f"Производитель: добавил {i}")
            condition.notify_all()  # Уведомляем потребителей
        time.sleep(0.5)

def consumer(name: str):
    while True:
        with condition:
            while not resource:  # Проверяем условие
                print(f"{name}: ждёт данные")
                condition.wait()  # Ждём уведомления
            
            if resource:
                item = resource.pop(0)
                print(f"{name}: получил {item}")

producer_thread = threading.Thread(target=producer)
consumer_threads = [threading.Thread(target=consumer, args=(f"Consumer-{i}",), daemon=True) for i in range(2)]

producer_thread.start()
for t in consumer_threads:
    t.start()

producer_thread.join()
time.sleep(1)

Queue (Очередь потокобезопасная)

Идеальна для обмена данными между потоками:

from queue import Queue

queue = Queue(maxsize=5)

def producer():
    for i in range(10):
        queue.put(i)
        print(f"Произведено: {i}")
        time.sleep(0.2)

def consumer(id: int):
    while True:
        item = queue.get()
        print(f"Потребитель {id} получил: {item}")
        time.sleep(0.5)
        queue.task_done()

producer_thread = threading.Thread(target=producer)
consumer_threads = [threading.Thread(target=consumer, args=(i,), daemon=True) for i in range(3)]

producer_thread.start()
for t in consumer_threads:
    t.start()

producer_thread.join()
queue.join()  # Ждёт завершения всех задач

GIL (Global Interpreter Lock)

Важно знать: Python имеет GIL, который предотвращает истинный параллелизм потоков при использовании CPU-bound операций:

import threading
import time

def cpu_bound_task(n: int) -> int:
    """Тяжёлая операция"""
    result = 0
    for i in range(50000000):
        result += i
    return result

# Один поток
start = time.time()
cpu_bound_task(1)
print(f"Один поток: {time.time() - start:.2f}s")

# Два потока (медленнее из-за GIL!)
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(1,)) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Два потока: {time.time() - start:.2f}s")  # Медленнее!

# Решение: использовать multiprocessing для CPU-bound
from multiprocessing import Pool

start = time.time()
with Pool(2) as p:
    p.map(cpu_bound_task, [1, 1])
print(f"Два процесса: {time.time() - start:.2f}s")  # Быстрее!

Когда использовать потоки

I/O-bound операции:

  • Сетевые запросы
  • Файловый ввод-вывод
  • Работа с БД
  • Ожидание событий

CPU-bound операции:

  • Математические вычисления
  • Обработка больших данных
  • Видеокодирование
  • Лучше использовать multiprocessing

Потокопулы (Thread Pools)

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url: str) -> str:
    response = requests.get(url)
    return f"{url}: {len(response.content)} bytes"

urls = ["https://python.org", "https://github.com", "https://docs.python.org"]

with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(fetch_url, urls)
    for result in results:
        print(result)

Лучшие практики

  • Используйте Queue для обмена данными между потоками
  • Минимизируйте критические секции (lock)
  • Избегайте deadlock'ов: освобождайте блокировки в правильном порядке
  • Для CPU-bound используйте multiprocessing, для I/O-bound — threading
  • Тестируйте многопоточный код на race conditions

Распараллеривание потоков — эффективный способ обработки асинхронных I/O операций, но требует осторожности с синхронизацией.