← Назад к вопросам

Как сделать параллельность в потоках без процессов в Python?

2.0 Middle🔥 181 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как сделать параллельность в потоках без процессов в Python

Это важная тема: Python имеет GIL (Global Interpreter Lock), который препятствует истинному параллелизму в потоках для CPU-bound задач. Но для I/O-bound задач многопоточность очень полезна.

1. Проблема: GIL (Global Interpreter Lock)

import threading
import time

def cpu_bound_task(n):
    """CPU-интенсивная задача"""
    result = 0
    for i in range(n):
        result += i
    return result

# Тест 1: Однопоточное выполнение
start = time.time()
cpu_bound_task(50000000)
cpu_bound_task(50000000)
print(f"Sequential: {time.time() - start:.2f}s")  # ~2s

# Тест 2: Многопоточное выполнение (с GIL)
start = time.time()
threads = [
    threading.Thread(target=cpu_bound_task, args=(50000000,)),
    threading.Thread(target=cpu_bound_task, args=(50000000,)),
]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Multi-threaded: {time.time() - start:.2f}s")  # ~2.5s — МЕДЛЕННЕЕ!

Почему медленнее? GIL позволяет только одному потоку исполняться за раз в CPython. Переключение между потоками добавляет overhead.

Вывод: многопоточность НЕ помогает CPU-bound задачам в CPython.

2. Когда многопоточность помогает: I/O-bound задачи

import threading
import time
import requests

def fetch_url(url):
    """I/O-интенсивная задача"""
    response = requests.get(url, timeout=5)
    return response.status_code

# Тест 1: Однопоточно
urls = ["https://example.com"] * 10
start = time.time()
for url in urls:
    fetch_url(url)
print(f"Sequential: {time.time() - start:.2f}s")  # ~20s (каждый запрос ~2s)

# Тест 2: Многопоточно
start = time.time()
threads = []
for url in urls:
    t = threading.Thread(target=fetch_url, args=(url,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"Multi-threaded: {time.time() - start:.2f}s")  # ~2s — БЫСТРО!

Почему быстро? Когда поток ждёт I/O (сеть, диск), другой поток может исполняться. GIL отпускается при I/O операциях.

3. Threading для I/O-bound задач

import threading
import requests
from concurrent.futures import ThreadPoolExecutor
from typing import List

# Способ 1: Ручное управление потоками
class DownloadWorker(threading.Thread):
    def __init__(self, url, result_list, index):
        super().__init__()
        self.url = url
        self.result_list = result_list
        self.index = index
        self.daemon = False  # Поток не умрёт при выходе главного
    
    def run(self):
        try:
            response = requests.get(self.url, timeout=5)
            self.result_list[self.index] = {
                "url": self.url,
                "status": response.status_code,
                "length": len(response.content),
            }
        except Exception as e:
            self.result_list[self.index] = {"error": str(e)}

# Использование
urls = ["https://example.com", "https://google.com", "https://github.com"]
results = [None] * len(urls)
threads = []

for i, url in enumerate(urls):
    thread = DownloadWorker(url, results, i)
    threads.append(thread)
    thread.start()

# Ждём завершения
for thread in threads:
    thread.join()

print(results)

# Способ 2: ThreadPoolExecutor (рекомендуется)
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_and_process(url):
    response = requests.get(url, timeout=5)
    return {"url": url, "length": len(response.content)}

# Пул из 5 потоков
with ThreadPoolExecutor(max_workers=5) as executor:
    # map() — сохраняет порядок
    results = list(executor.map(fetch_and_process, urls))
    
# Или с обработкой результатов по мере готовности
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(fetch_and_process, url): url for url in urls}
    
    for future in as_completed(futures):
        try:
            result = future.result(timeout=5)
            print(f"Completed: {result}")
        except Exception as e:
            print(f"Error: {e}")

4. Thread-safe данные: Lock и RLock

import threading
from threading import Lock, RLock

# Проблема: race condition
class Counter:
    def __init__(self):
        self.value = 0
    
    def increment(self):
        # Это не атомарная операция!
        temp = self.value
        temp += 1
        self.value = temp

counter = Counter()
threads = []

for _ in range(100):
    t = threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Expected: 100000, Got: {counter.value}")  # Обычно < 100000!

# Решение: используй Lock
class ThreadSafeCounter:
    def __init__(self):
        self.value = 0
        self.lock = Lock()  # Обычный Lock
    
    def increment(self):
        with self.lock:  # Критическая секция
            temp = self.value
            temp += 1
            self.value = temp

# Или для рекурсивных блокировок
class RecursiveCounter:
    def __init__(self):
        self.value = 0
        self.lock = RLock()  # Может быть захвачен тем же потоком
    
    def increment(self):
        with self.lock:
            self._do_increment()
    
    def _do_increment(self):
        with self.lock:  # RLock позволяет повторный захват
            self.value += 1

5. Синхронизация потоков: Event, Condition, Semaphore

import threading
from threading import Event, Condition, Semaphore
import time

# Event — простой флаг
class ProducerConsumer:
    def __init__(self):
        self.data = None
        self.event = Event()
    
    def producer(self):
        for i in range(5):
            time.sleep(0.5)
            self.data = i
            print(f"Produced: {i}")
            self.event.set()  # Сигнализируем что данные готовы
    
    def consumer(self):
        for _ in range(5):
            self.event.wait()  # Ждём сигнала
            print(f"Consumed: {self.data}")
            self.event.clear()  # Очищаем флаг

pc = ProducerConsumer()
t1 = threading.Thread(target=pc.producer)
t2 = threading.Thread(target=pc.consumer)
t1.start()
t2.start()
t1.join()
t2.join()

# Condition — более мощный вариант (для сложных сценариев)
class Queue:
    def __init__(self):
        self.items = []
        self.cond = Condition()
    
    def put(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify_all()  # Пробудить все ждущие потоки
    
    def get(self):
        with self.cond:
            while not self.items:  # while, не if!
                self.cond.wait()    # Ждём сигнала
            return self.items.pop(0)

# Semaphore — счётчик доступа
class LimitedResource:
    def __init__(self, max_users=2):
        self.semaphore = Semaphore(max_users)
    
    def use(self, user_id):
        with self.semaphore:
            print(f"User {user_id} using resource")
            time.sleep(1)
            print(f"User {user_id} done")

resource = LimitedResource(max_users=2)
threads = []
for i in range(5):
    t = threading.Thread(target=resource.use, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

6. Queue для потокобезопасной передачи данных

import threading
from queue import Queue, PriorityQueue, LifoQueue
import time

# Queue (FIFO)
queue = Queue(maxsize=10)

def producer():
    for i in range(5):
        queue.put(f"Item {i}")
        print(f"Produced: Item {i}")
        time.sleep(0.1)

def consumer():
    while True:
        item = queue.get()  # Блокируется если пуста
        if item is None:
            break
        print(f"Consumed: {item}")
        queue.task_done()  # Отметить как выполненную

threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer),
]

for t in threads:
    t.start()

for t in threads:
    t.join()

# PriorityQueue
prio_queue = PriorityQueue()
prio_queue.put((1, "High priority"))
prio_queue.put((3, "Low priority"))
prio_queue.put((2, "Medium priority"))

while not prio_queue.empty():
    priority, item = prio_queue.get()
    print(f"{priority}: {item}")  # Выведет в порядке приоритета

# LIFO Queue (стек)
lifo = LifoQueue()
lifo.put(1)
lifo.put(2)
lifo.put(3)
print(lifo.get())  # 3
print(lifo.get())  # 2
print(lifo.get())  # 1

7. Asyncio вместо потоков (лучше для I/O)

import asyncio
import aiohttp

# Asyncio с await — современный подход
async def fetch_url(session, url):
    async with session.get(url, timeout=5) as response:
        return await response.text()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Запуск
urls = ["https://example.com"] * 10
results = asyncio.run(fetch_all(urls))

# asyncio обычно быстрее и проще чем threading для I/O

8. Сравнение подходов

┌─────────────────────────────────────────────────────┐
│ Когда использовать что                               │
├─────────────────────────────────────────────────────┤
│ CPU-bound задачи:                                   │
│   → multiprocessing (обходит GIL)                  │
│                                                     │
│ I/O-bound задачи:                                   │
│   → asyncio (самое быстрое и простое)              │
│   → threading (простой подход)                     │
│   → многопроцессов (если asyncio сложный)         │
│                                                     │
│ Смешанные задачи:                                   │
│   → concurrent.futures (простая абстракция)        │
└─────────────────────────────────────────────────────┘

9. Практический пример: веб-скрейпер

import threading
import requests
from queue import Queue
import time

class WebScraper:
    def __init__(self, num_workers=5):
        self.queue = Queue()
        self.results = []
        self.lock = threading.Lock()
        self.num_workers = num_workers
    
    def worker(self):
        while True:
            url = self.queue.get()
            if url is None:
                break
            
            try:
                response = requests.get(url, timeout=5)
                with self.lock:
                    self.results.append({
                        "url": url,
                        "status": response.status_code,
                        "length": len(response.content),
                    })
            except Exception as e:
                with self.lock:
                    self.results.append({"url": url, "error": str(e)})
            finally:
                self.queue.task_done()
    
    def scrape(self, urls):
        # Запуск рабочих потоков
        threads = []
        for _ in range(self.num_workers):
            t = threading.Thread(target=self.worker)
            t.start()
            threads.append(t)
        
        # Добавление работ
        for url in urls:
            self.queue.put(url)
        
        # Ожидание завершения
        self.queue.join()
        
        # Остановка рабочих
        for _ in range(self.num_workers):
            self.queue.put(None)
        
        for t in threads:
            t.join()
        
        return self.results

# Использование
scraper = WebScraper(num_workers=5)
urls = ["https://example.com"] * 10
results = scraper.scrape(urls)
print(results)

10. Лучшие практики

# 1. Используй context managers (with)
with lock:
    # критическая секция
    pass

# 2. Не держи lock дольше необходимого
with lock:
    data = shared_resource  # Быстро!
process_data(data)  # Без lock

# 3. Избегай deadlock
# Плохо: захватываешь lock1, потом lock2
# Хорошо: всегда в одинаковом порядке

# 4. Используй ThreadPoolExecutor вместо ручного создания
# Плохо
for i in range(1000):
    t = threading.Thread(...)
    t.start()

# Хорошо
with ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(func, items)

# 5. Помни про daemon потоки
t = threading.Thread(target=func)
t.daemon = True  # Завершится при выходе главного потока
t.start()

Ключевое правило

Для I/O-bound в Python:

  1. Сначала пробуй asyncio — самое быстро
  2. Если asyncio сложный → threading
  3. Последнее средство → multiprocessing

Для CPU-bound:

  • Только multiprocessing (обходит GIL)
  • Никогда не threading!

Помни: правильная выбор инструмента экономит часы отладки!

Как сделать параллельность в потоках без процессов в Python? | PrepBro