В каких ситуациях используют многопроцессинг (multiprocessing)
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Ситуации для использования многопроцессинга (multiprocessing)
Многопроцессинг позволяет создавать несколько независимых процессов, которые могут выполняться параллельно на разных ядрах процессора. Это очень отличается от многопоточности (threading). Давайте разберёмся, когда и почему использовать многопроцессинг.
Основная проблема: GIL (Global Interpreter Lock)
import threading
import time
def cpu_intensive_task():
"""Требовательная к CPU функция"""
total = 0
for i in range(100_000_000):
total += i
return total
# Попытка с потоками (НЕ поможет)
start = time.time()
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_intensive_task)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Threading: {time.time() - start:.2f}s") # ~20 сек (медленнее, чем одиночное выполнение!)
# Правильное решение: многопроцессинг
from multiprocessing import Process
start = time.time()
processes = []
for _ in range(4):
p = Process(target=cpu_intensive_task)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Multiprocessing: {time.time() - start:.2f}s") # ~5 сек (в 4 раза быстрее!)
В Python есть GIL (Global Interpreter Lock), который позволяет только одному потоку выполнять Python код одновременно. Многопроцессинг обходит это ограничение.
Когда использовать многопроцессинг
1. Требовательные к CPU задачи (CPU-bound tasks)
from multiprocessing import Pool
import time
def fibonacci(n):
"""Требовательная к CPU функция"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
if __name__ == '__main__':
numbers = [30, 31, 32, 33, 34]
# Последовательное выполнение
start = time.time()
results = [fibonacci(n) for n in numbers]
print(f"Sequential: {time.time() - start:.2f}s") # ~10 сек
# Параллельное выполнение на 4 процессах
start = time.time()
with Pool(4) as pool:
results = pool.map(fibonacci, numbers)
print(f"Multiprocessing: {time.time() - start:.2f}s") # ~3 сек
2. Обработка больших данных
from multiprocessing import Pool
import numpy as np
def process_chunk(data_chunk):
"""Обработка части данных"""
# Сложные вычисления
return np.sum(data_chunk ** 2)
if __name__ == '__main__':
# 100 миллионов точек данных
large_data = np.random.randn(100_000_000)
# Разбиваем на чанки
chunks = np.array_split(large_data, 4)
# Обрабатываем в параллель
with Pool(4) as pool:
results = pool.map(process_chunk, chunks)
final_result = sum(results)
print(final_result)
3. Машинное обучение и анализ данных
from multiprocessing import Pool
from sklearn.ensemble import RandomForestClassifier
import numpy as np
def train_model(fold_data):
"""Обучение модели на одном фолде"""
X_train, y_train, X_test, y_test = fold_data
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
return model.score(X_test, y_test)
if __name__ == '__main__':
# 5-fold кроссвалидация
folds = [...] # Подготовленные фолды
# Обучаем модели параллельно
with Pool(5) as pool:
scores = pool.map(train_model, folds)
print(f"Average score: {np.mean(scores):.3f}")
4. Обработка файлов
from multiprocessing import Pool
import os
from pathlib import Path
def process_image(filepath):
"""Обработка одного изображения"""
from PIL import Image
img = Image.open(filepath)
# Изменение размера, фильтры и т.д.
img_resized = img.resize((256, 256))
output = filepath.replace('.jpg', '_processed.jpg')
img_resized.save(output)
return output
if __name__ == '__main__':
image_files = list(Path('images/').glob('*.jpg'))
# Обработка 1000 изображений на 4 процессах
with Pool(4) as pool:
results = pool.map(process_image, image_files)
print(f"Processed {len(results)} images")
5. Запуск независимых программ
from multiprocessing import Process
import subprocess
def run_script(script_name):
"""Запуск скрипта в отдельном процессе"""
result = subprocess.run(['python', script_name], capture_output=True)
return result.stdout.decode()
if __name__ == '__main__':
scripts = ['script1.py', 'script2.py', 'script3.py', 'script4.py']
processes = []
for script in scripts:
p = Process(target=run_script, args=(script,))
p.start()
processes.append(p)
for p in processes:
p.join()
6. Обработка очередей данных
from multiprocessing import Queue, Process
from queue import Empty
import time
def worker(input_queue, output_queue, worker_id):
"""Рабочий процесс"""
while True:
try:
task = input_queue.get(timeout=1)
if task is None: # Сигнал завершения
break
result = task ** 2
output_queue.put((worker_id, result))
except Empty:
continue
if __name__ == '__main__':
input_q = Queue()
output_q = Queue()
# Создаём 4 рабочих процесса
workers = []
for i in range(4):
p = Process(target=worker, args=(input_q, output_q, i))
p.start()
workers.append(p)
# Добавляем задачи
for i in range(20):
input_q.put(i)
# Сигнал завершения
for _ in range(4):
input_q.put(None)
# Собираем результаты
results = []
while len(results) < 20:
try:
worker_id, result = output_q.get(timeout=1)
results.append(result)
print(f"Worker {worker_id}: {result}")
except Empty:
pass
# Ждём завершения всех рабочих
for p in workers:
p.join()
Сравнение многопроцессинга и многопоточности
Характеристика | Threading | Multiprocessing
| |
————————————————————|———————————————————|—————————————————
CPU-bound задачи | ✗ Медленно (GIL) | ✓ Быстро
I/O задачи | ✓ Быстро | ✓ Быстро
Оверхеад | Низкий | Высокий
Память | Меньше | Больше (на процесс)
Общие данные | Легко делиться | Сложно (копируется)
Отлаживание | Легче | Сложнее
Опасность deadlock | Да | Нет
Практический пример: обработка больших файлов логов
from multiprocessing import Pool
from pathlib import Path
from collections import Counter
def analyze_log_file(filepath):
"""Анализ одного файла логов"""
error_counts = Counter()
with open(filepath, 'r') as f:
for line in f:
if 'ERROR' in line:
error_counts['ERROR'] += 1
elif 'WARNING' in line:
error_counts['WARNING'] += 1
return dict(error_counts)
if __name__ == '__main__':
log_files = list(Path('logs/').glob('*.log'))
# Анализируем логи параллельно
with Pool(8) as pool:
results = pool.map(analyze_log_file, log_files)
# Объединяем результаты
total_errors = Counter()
for result in results:
for key, value in result.items():
total_errors[key] += value
print(f"Total errors: {total_errors}")
Когда НЕ использовать многопроцессинг
# ❌ НЕ используй для I/O операций
# Используй asyncio или threading вместо этого
from multiprocessing import Pool
import requests
def fetch_url(url):
response = requests.get(url) # I/O операция
return response.text
urls = ['http://example.com'] * 100
with Pool(4) as pool:
results = pool.map(fetch_url, urls) # Неэффективно!
# ✓ ПРАВИЛЬНО: используй asyncio
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Итоговые рекомендации
Используй многопроцессинг для:
- CPU-bound операций (математика, обработка данных, ML)
- Обработки больших объёмов данных
- Запуска независимых задач
Используй многопоточность (threading) для:
- I/O операций (сетевые запросы, чтение файлов)
- Простых задач с малым перекрытием
Используй asyncio для:
- Асинхронных I/O операций
- Когда нужна высокая производительность с малым потреблением памяти
Многопроцессинг в Python — это мощный инструмент для параллельной обработки CPU-интенсивных задач, но помни про гарант пирамиды: сначала попробуй обычное последовательное выполнение, потом многопоточность/asyncio для I/O, и только потом многопроцессинг, если это действительно необходимо.