← Назад к вопросам
Как сделать параллельность в потоках без процессов в Python?
2.0 Middle🔥 181 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как сделать параллельность в потоках без процессов в Python
Это важная тема: Python имеет GIL (Global Interpreter Lock), который препятствует истинному параллелизму в потоках для CPU-bound задач. Но для I/O-bound задач многопоточность очень полезна.
1. Проблема: GIL (Global Interpreter Lock)
import threading
import time
def cpu_bound_task(n):
"""CPU-интенсивная задача"""
result = 0
for i in range(n):
result += i
return result
# Тест 1: Однопоточное выполнение
start = time.time()
cpu_bound_task(50000000)
cpu_bound_task(50000000)
print(f"Sequential: {time.time() - start:.2f}s") # ~2s
# Тест 2: Многопоточное выполнение (с GIL)
start = time.time()
threads = [
threading.Thread(target=cpu_bound_task, args=(50000000,)),
threading.Thread(target=cpu_bound_task, args=(50000000,)),
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Multi-threaded: {time.time() - start:.2f}s") # ~2.5s — МЕДЛЕННЕЕ!
Почему медленнее? GIL позволяет только одному потоку исполняться за раз в CPython. Переключение между потоками добавляет overhead.
Вывод: многопоточность НЕ помогает CPU-bound задачам в CPython.
2. Когда многопоточность помогает: I/O-bound задачи
import threading
import time
import requests
def fetch_url(url):
"""I/O-интенсивная задача"""
response = requests.get(url, timeout=5)
return response.status_code
# Тест 1: Однопоточно
urls = ["https://example.com"] * 10
start = time.time()
for url in urls:
fetch_url(url)
print(f"Sequential: {time.time() - start:.2f}s") # ~20s (каждый запрос ~2s)
# Тест 2: Многопоточно
start = time.time()
threads = []
for url in urls:
t = threading.Thread(target=fetch_url, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Multi-threaded: {time.time() - start:.2f}s") # ~2s — БЫСТРО!
Почему быстро? Когда поток ждёт I/O (сеть, диск), другой поток может исполняться. GIL отпускается при I/O операциях.
3. Threading для I/O-bound задач
import threading
import requests
from concurrent.futures import ThreadPoolExecutor
from typing import List
# Способ 1: Ручное управление потоками
class DownloadWorker(threading.Thread):
def __init__(self, url, result_list, index):
super().__init__()
self.url = url
self.result_list = result_list
self.index = index
self.daemon = False # Поток не умрёт при выходе главного
def run(self):
try:
response = requests.get(self.url, timeout=5)
self.result_list[self.index] = {
"url": self.url,
"status": response.status_code,
"length": len(response.content),
}
except Exception as e:
self.result_list[self.index] = {"error": str(e)}
# Использование
urls = ["https://example.com", "https://google.com", "https://github.com"]
results = [None] * len(urls)
threads = []
for i, url in enumerate(urls):
thread = DownloadWorker(url, results, i)
threads.append(thread)
thread.start()
# Ждём завершения
for thread in threads:
thread.join()
print(results)
# Способ 2: ThreadPoolExecutor (рекомендуется)
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_and_process(url):
response = requests.get(url, timeout=5)
return {"url": url, "length": len(response.content)}
# Пул из 5 потоков
with ThreadPoolExecutor(max_workers=5) as executor:
# map() — сохраняет порядок
results = list(executor.map(fetch_and_process, urls))
# Или с обработкой результатов по мере готовности
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_and_process, url): url for url in urls}
for future in as_completed(futures):
try:
result = future.result(timeout=5)
print(f"Completed: {result}")
except Exception as e:
print(f"Error: {e}")
4. Thread-safe данные: Lock и RLock
import threading
from threading import Lock, RLock
# Проблема: race condition
class Counter:
def __init__(self):
self.value = 0
def increment(self):
# Это не атомарная операция!
temp = self.value
temp += 1
self.value = temp
counter = Counter()
threads = []
for _ in range(100):
t = threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Expected: 100000, Got: {counter.value}") # Обычно < 100000!
# Решение: используй Lock
class ThreadSafeCounter:
def __init__(self):
self.value = 0
self.lock = Lock() # Обычный Lock
def increment(self):
with self.lock: # Критическая секция
temp = self.value
temp += 1
self.value = temp
# Или для рекурсивных блокировок
class RecursiveCounter:
def __init__(self):
self.value = 0
self.lock = RLock() # Может быть захвачен тем же потоком
def increment(self):
with self.lock:
self._do_increment()
def _do_increment(self):
with self.lock: # RLock позволяет повторный захват
self.value += 1
5. Синхронизация потоков: Event, Condition, Semaphore
import threading
from threading import Event, Condition, Semaphore
import time
# Event — простой флаг
class ProducerConsumer:
def __init__(self):
self.data = None
self.event = Event()
def producer(self):
for i in range(5):
time.sleep(0.5)
self.data = i
print(f"Produced: {i}")
self.event.set() # Сигнализируем что данные готовы
def consumer(self):
for _ in range(5):
self.event.wait() # Ждём сигнала
print(f"Consumed: {self.data}")
self.event.clear() # Очищаем флаг
pc = ProducerConsumer()
t1 = threading.Thread(target=pc.producer)
t2 = threading.Thread(target=pc.consumer)
t1.start()
t2.start()
t1.join()
t2.join()
# Condition — более мощный вариант (для сложных сценариев)
class Queue:
def __init__(self):
self.items = []
self.cond = Condition()
def put(self, item):
with self.cond:
self.items.append(item)
self.cond.notify_all() # Пробудить все ждущие потоки
def get(self):
with self.cond:
while not self.items: # while, не if!
self.cond.wait() # Ждём сигнала
return self.items.pop(0)
# Semaphore — счётчик доступа
class LimitedResource:
def __init__(self, max_users=2):
self.semaphore = Semaphore(max_users)
def use(self, user_id):
with self.semaphore:
print(f"User {user_id} using resource")
time.sleep(1)
print(f"User {user_id} done")
resource = LimitedResource(max_users=2)
threads = []
for i in range(5):
t = threading.Thread(target=resource.use, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
6. Queue для потокобезопасной передачи данных
import threading
from queue import Queue, PriorityQueue, LifoQueue
import time
# Queue (FIFO)
queue = Queue(maxsize=10)
def producer():
for i in range(5):
queue.put(f"Item {i}")
print(f"Produced: Item {i}")
time.sleep(0.1)
def consumer():
while True:
item = queue.get() # Блокируется если пуста
if item is None:
break
print(f"Consumed: {item}")
queue.task_done() # Отметить как выполненную
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer),
]
for t in threads:
t.start()
for t in threads:
t.join()
# PriorityQueue
prio_queue = PriorityQueue()
prio_queue.put((1, "High priority"))
prio_queue.put((3, "Low priority"))
prio_queue.put((2, "Medium priority"))
while not prio_queue.empty():
priority, item = prio_queue.get()
print(f"{priority}: {item}") # Выведет в порядке приоритета
# LIFO Queue (стек)
lifo = LifoQueue()
lifo.put(1)
lifo.put(2)
lifo.put(3)
print(lifo.get()) # 3
print(lifo.get()) # 2
print(lifo.get()) # 1
7. Asyncio вместо потоков (лучше для I/O)
import asyncio
import aiohttp
# Asyncio с await — современный подход
async def fetch_url(session, url):
async with session.get(url, timeout=5) as response:
return await response.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Запуск
urls = ["https://example.com"] * 10
results = asyncio.run(fetch_all(urls))
# asyncio обычно быстрее и проще чем threading для I/O
8. Сравнение подходов
┌─────────────────────────────────────────────────────┐
│ Когда использовать что │
├─────────────────────────────────────────────────────┤
│ CPU-bound задачи: │
│ → multiprocessing (обходит GIL) │
│ │
│ I/O-bound задачи: │
│ → asyncio (самое быстрое и простое) │
│ → threading (простой подход) │
│ → многопроцессов (если asyncio сложный) │
│ │
│ Смешанные задачи: │
│ → concurrent.futures (простая абстракция) │
└─────────────────────────────────────────────────────┘
9. Практический пример: веб-скрейпер
import threading
import requests
from queue import Queue
import time
class WebScraper:
def __init__(self, num_workers=5):
self.queue = Queue()
self.results = []
self.lock = threading.Lock()
self.num_workers = num_workers
def worker(self):
while True:
url = self.queue.get()
if url is None:
break
try:
response = requests.get(url, timeout=5)
with self.lock:
self.results.append({
"url": url,
"status": response.status_code,
"length": len(response.content),
})
except Exception as e:
with self.lock:
self.results.append({"url": url, "error": str(e)})
finally:
self.queue.task_done()
def scrape(self, urls):
# Запуск рабочих потоков
threads = []
for _ in range(self.num_workers):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
# Добавление работ
for url in urls:
self.queue.put(url)
# Ожидание завершения
self.queue.join()
# Остановка рабочих
for _ in range(self.num_workers):
self.queue.put(None)
for t in threads:
t.join()
return self.results
# Использование
scraper = WebScraper(num_workers=5)
urls = ["https://example.com"] * 10
results = scraper.scrape(urls)
print(results)
10. Лучшие практики
# 1. Используй context managers (with)
with lock:
# критическая секция
pass
# 2. Не держи lock дольше необходимого
with lock:
data = shared_resource # Быстро!
process_data(data) # Без lock
# 3. Избегай deadlock
# Плохо: захватываешь lock1, потом lock2
# Хорошо: всегда в одинаковом порядке
# 4. Используй ThreadPoolExecutor вместо ручного создания
# Плохо
for i in range(1000):
t = threading.Thread(...)
t.start()
# Хорошо
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(func, items)
# 5. Помни про daemon потоки
t = threading.Thread(target=func)
t.daemon = True # Завершится при выходе главного потока
t.start()
Ключевое правило
Для I/O-bound в Python:
- Сначала пробуй asyncio — самое быстро
- Если asyncio сложный → threading
- Последнее средство → multiprocessing
Для CPU-bound:
- Только multiprocessing (обходит GIL)
- Никогда не threading!
Помни: правильная выбор инструмента экономит часы отладки!