Что такое распараллеривание потоков?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Распараллеривание потоков (Thread Parallelization)
Распараллеривание потоков — это техника, позволяющая программе одновременно выполнять несколько задач в рамках одного процесса, используя несколько потоков (threads). Каждый поток имеет свой стек вызовов и счётчик команд, но все потоки одного процесса работают с общей памятью.
Различие: потоки vs процессы
- Потоки (Threads): лёгкие, быстрое переключение, общая память
- Процессы (Processes): тяжёлые, изолированная память, медленнее
Основные концепции
1. Создание потоков
import threading
import time
from typing import Any
def worker(name: str, delay: int) -> None:
"""Функция для выполнения в отдельном потоке"""
for i in range(3):
time.sleep(delay)
print(f"{name}: итерация {i}")
# Способ 1: Простой поток
thread1 = threading.Thread(target=worker, args=("Поток-1", 1))
thread2 = threading.Thread(target=worker, args=("Поток-2", 0.5))
# Запуск потоков
thread1.start()
thread2.start()
# Ожидание завершения
thread1.join()
thread2.join()
print("Все потоки завершены")
2. Класс-подкласс Thread
class MyThread(threading.Thread):
def __init__(self, name: str, count: int):
super().__init__()
self.name = name
self.count = count
def run(self) -> None:
"""Переопределяем метод run"""
for i in range(self.count):
print(f"{self.name}: {i}")
time.sleep(0.1)
thread = MyThread("CustomThread", 5)
thread.start()
thread.join()
3. Демон-потоки
Демон-потоки автоматически завершаются при выходе основной программы:
def background_task():
while True:
print("Фоновая задача работает...")
time.sleep(1)
thread = threading.Thread(target=background_task, daemon=True)
thread.start()
time.sleep(3)
print("Программа завершается")
# Демон-поток автоматически остановится
Синхронизация потоков
1. Lock (Мьютекс)
Предотвращает одновременный доступ нескольких потоков к ресурсу:
lock = threading.Lock()
counter = 0
def increment():
global counter
for _ in range(100000):
with lock: # Критическая секция
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # 500000 (корректно)
2. RLock (Рекурсивная блокировка)
Позволяет одному потоку много раз получить одну блокировку:
rlock = threading.RLock()
def function_a():
with rlock:
print("A: критическая секция")
function_b()
def function_b():
with rlock: # Один поток может получить RLock множество раз
print("B: критическая секция")
thread = threading.Thread(target=function_a)
thread.start()
thread.join()
3. Semaphore (Семафор)
Ограничивает количество потоков, одновременно заходящих в ресурс:
sem = threading.Semaphore(2) # Максимум 2 потока одновременно
def access_resource(id: int):
with sem:
print(f"Поток {id} использует ресурс")
time.sleep(1)
print(f"Поток {id} освобождает ресурс")
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
4. Event (Событие)
Один поток может сигнализировать, а другие ждать события:
event = threading.Event()
def waiter(name: str):
print(f"{name}: ожидает событие")
event.wait() # Блокируется до set()
print(f"{name}: событие произошло!")
def setter():
time.sleep(2)
print("Setter: генерирует событие")
event.set()
threads = [threading.Thread(target=waiter, args=(f"Waiter-{i}",)) for i in range(3)]
threads.append(threading.Thread(target=setter))
for t in threads:
t.start()
for t in threads:
t.join()
5. Condition (Условная переменная)
Для координации нескольких потоков на основе условия:
condition = threading.Condition()
resource = []
def producer():
for i in range(5):
with condition:
resource.append(i)
print(f"Производитель: добавил {i}")
condition.notify_all() # Уведомляем потребителей
time.sleep(0.5)
def consumer(name: str):
while True:
with condition:
while not resource: # Проверяем условие
print(f"{name}: ждёт данные")
condition.wait() # Ждём уведомления
if resource:
item = resource.pop(0)
print(f"{name}: получил {item}")
producer_thread = threading.Thread(target=producer)
consumer_threads = [threading.Thread(target=consumer, args=(f"Consumer-{i}",), daemon=True) for i in range(2)]
producer_thread.start()
for t in consumer_threads:
t.start()
producer_thread.join()
time.sleep(1)
Queue (Очередь потокобезопасная)
Идеальна для обмена данными между потоками:
from queue import Queue
queue = Queue(maxsize=5)
def producer():
for i in range(10):
queue.put(i)
print(f"Произведено: {i}")
time.sleep(0.2)
def consumer(id: int):
while True:
item = queue.get()
print(f"Потребитель {id} получил: {item}")
time.sleep(0.5)
queue.task_done()
producer_thread = threading.Thread(target=producer)
consumer_threads = [threading.Thread(target=consumer, args=(i,), daemon=True) for i in range(3)]
producer_thread.start()
for t in consumer_threads:
t.start()
producer_thread.join()
queue.join() # Ждёт завершения всех задач
GIL (Global Interpreter Lock)
Важно знать: Python имеет GIL, который предотвращает истинный параллелизм потоков при использовании CPU-bound операций:
import threading
import time
def cpu_bound_task(n: int) -> int:
"""Тяжёлая операция"""
result = 0
for i in range(50000000):
result += i
return result
# Один поток
start = time.time()
cpu_bound_task(1)
print(f"Один поток: {time.time() - start:.2f}s")
# Два потока (медленнее из-за GIL!)
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(1,)) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Два потока: {time.time() - start:.2f}s") # Медленнее!
# Решение: использовать multiprocessing для CPU-bound
from multiprocessing import Pool
start = time.time()
with Pool(2) as p:
p.map(cpu_bound_task, [1, 1])
print(f"Два процесса: {time.time() - start:.2f}s") # Быстрее!
Когда использовать потоки
✅ I/O-bound операции:
- Сетевые запросы
- Файловый ввод-вывод
- Работа с БД
- Ожидание событий
❌ CPU-bound операции:
- Математические вычисления
- Обработка больших данных
- Видеокодирование
- Лучше использовать
multiprocessing
Потокопулы (Thread Pools)
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url: str) -> str:
response = requests.get(url)
return f"{url}: {len(response.content)} bytes"
urls = ["https://python.org", "https://github.com", "https://docs.python.org"]
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(fetch_url, urls)
for result in results:
print(result)
Лучшие практики
- Используйте
Queueдля обмена данными между потоками - Минимизируйте критические секции (lock)
- Избегайте deadlock'ов: освобождайте блокировки в правильном порядке
- Для CPU-bound используйте
multiprocessing, для I/O-bound —threading - Тестируйте многопоточный код на race conditions
Распараллеривание потоков — эффективный способ обработки асинхронных I/O операций, но требует осторожности с синхронизацией.