Какие знаешь инструменты запуска нескольких процессов в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Инструменты запуска нескольких процессов в Python
Есть несколько способов запустить несколько процессов:
1. multiprocessing.Pool — Самый практичный способ
Идеален для параллельной обработки данных.
import multiprocessing
import time
def task(n):
"""Вычислительная задача"""
time.sleep(1)
return n * n
if __name__ == '__main__':
# Создаём пул из 4 процессов
with multiprocessing.Pool(4) as pool:
# map: применяем функцию ко всем элементам
results = pool.map(task, range(10))
print(results) # [0, 1, 4, 9, 16, ...]
# imap: ленивая версия (экономит память)
for result in pool.imap(task, range(100)):
print(result)
# apply_async: асинхронный запрос
async_result = pool.apply_async(task, (5,))
result = async_result.get(timeout=5) # Ждём результат
print(result) # 25
Особенности:
- Автоматическое управление процессами
- Очередь задач (task queue)
- Балансировка нагрузки
- Контекст-менеджер (
with) автоматически закрывает пул
2. multiprocessing.Process — Низкоуровневый контроль
Для более сложных сценариев.
import multiprocessing
import time
def worker(name, queue):
"""Рабочий процесс"""
for i in range(5):
time.sleep(1)
queue.put(f"{name} выполнил итерацию {i}")
queue.put(None) # Сигнал окончания
if __name__ == '__main__':
queue = multiprocessing.Queue() # Очередь для общения
# Создаём 3 процесса
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(f"Worker-{i}", queue))
p.start()
processes.append(p)
# Получаем результаты
completed = 0
while completed < 3:
msg = queue.get() # Блокирует, пока нет данных
if msg is None:
completed += 1
else:
print(msg)
# Ждём завершения
for p in processes:
p.join() # Блокирует, пока процесс не завершится
print("Все процессы завершены")
Особенности:
- Полный контроль над процессами
- Можно запускать разные функции
- Общение через Queue, Pipe
- Нужно вручную управлять жизненным циклом
3. concurrent.futures.ProcessPoolExecutor — Современный способ
Аналог Pool, но современнее.
from concurrent.futures import ProcessPoolExecutor
import time
def task(n):
time.sleep(1)
return n * n
if __name__ == '__main__':
# Контекст-менеджер автоматически управляет пулом
with ProcessPoolExecutor(max_workers=4) as executor:
# map: применяем функцию
results = executor.map(task, range(10))
print(list(results)) # [0, 1, 4, 9, 16, ...]
# submit: отправляем одну задачу
future = executor.submit(task, 5)
result = future.result(timeout=5)
print(result) # 25
# Ждём первый готовый результат
from concurrent.futures import as_completed
futures = [executor.submit(task, i) for i in range(5)]
for future in as_completed(futures):
print(f"Готов результат: {future.result()}")
Преимущества над Pool:
- Возвращает Future объекты (удобнее)
- Единый интерфейс (ProcessPoolExecutor, ThreadPoolExecutor, можно менять)
- Современный API
4. multiprocessing.Manager — Общее состояние между процессами
Для синхронизации данных между процессами.
import multiprocessing
def worker(shared_list, shared_dict, name):
"""Рабочий может изменять общие структуры"""
shared_list.append(f"{name} добавил элемент")
shared_dict[name] = {'status': 'completed', 'items': len(shared_list)}
if __name__ == '__main__':
# Manager создаёт общие структуры
manager = multiprocessing.Manager()
shared_list = manager.list()
shared_dict = manager.dict()
# Запускаем процессы
processes = []
for i in range(3):
p = multiprocessing.Process(
target=worker,
args=(shared_list, shared_dict, f"Worker-{i}")
)
p.start()
processes.append(p)
# Ждём
for p in processes:
p.join()
# Видим общие данные
print(list(shared_list))
# ['Worker-0 добавил элемент', 'Worker-1 добавил элемент', ...]
print(dict(shared_dict))
# {'Worker-0': {'status': 'completed', 'items': 3}, ...}
Использование: синхронизация счётчиков, логирования, статуса между процессами.
5. subprocess — Запуск внешних программ
Для запуска других скриптов или команд ОС.
import subprocess
import concurrent.futures
# Запустить внешний скрипт
result = subprocess.run(
['python', 'external_script.py', '--arg', 'value'],
capture_output=True,
text=True
)
print(result.stdout)
print(result.returncode)
# Запустить несколько команд параллельно
def run_command(cmd):
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
return result.stdout
with concurrent.futures.ProcessPoolExecutor() as executor:
commands = [
'echo "Hello from 1"',
'echo "Hello from 2"',
'echo "Hello from 3"',
]
results = executor.map(run_command, commands)
for r in results:
print(r.strip())
6. asyncio — Асинхронное программирование (не процессы, а корутины)
Для I/O-bound задач без создания процессов.
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Загружаем URL асинхронно"""
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'http://example.com',
'http://google.com',
'http://github.com',
]
async with aiohttp.ClientSession() as session:
# gather запускает все корутины параллельно
results = await asyncio.gather(
*[fetch_url(session, url) for url in urls]
)
print(len(results))
# Запуск
asyncio.run(main())
Преимущество: не создаёт новые процессы, использует один процесс с переключением контекста.
7. celery + Redis — Распределённая обработка
Для больших приложений с микросервисами.
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def add(x, y):
"""Асинхронная задача"""
time.sleep(2)
return x + y
# Отправляем в очередь (не ждём результат)
task = add.delay(4, 6)
# Позже получаем результат
result = task.get(timeout=10) # 10
# Несколько задач
results = [add.delay(i, i+1) for i in range(5)]
for result in results:
print(result.get())
Преимущества:
- Распределённая обработка (процессы на разных машинах)
- Очередь задач
- Масштабируемость
- Переиспользование рабочих процессов
Сравнительная таблица
| Инструмент | Тип | Когда использовать | Сложность |
|---|---|---|---|
| Pool | Процессы | Простая параллельная обработка | Низкая |
| Process | Процессы | Сложное управление жизненным циклом | Средняя |
| ProcessPoolExecutor | Процессы | Современный способ | Низкая |
| Manager | Процессы + синхр. | Общее состояние между процессами | Средняя |
| subprocess | Внешние программы | Запуск других скриптов/команд | Низкая |
| asyncio | Корутины (один процесс) | I/O-bound, сетевые запросы | Средняя |
| celery | Распределённая обработка | Микросервисы, масштабирование | Высокая |
На собеседовании
"Для простой параллельной обработки — multiprocessing.Pool. Для более сложных сценариев — concurrent.futures.ProcessPoolExecutor. Для I/O-bound задач без создания процессов — asyncio. Для микросервисов и распределённой обработки — celery. Главное помнить, что процессы дорогие (память, создание), поэтому для I/O используй asyncio (одна корутина на I/O ждёт, другие выполняются)."
Практический пример: обработка списка файлов
from concurrent.futures import ProcessPoolExecutor
import os
def process_file(filepath):
"""Обработка одного файла"""
with open(filepath, 'r') as f:
lines = f.readlines()
return len(lines)
if __name__ == '__main__':
files = [f for f in os.listdir('.') if f.endswith('.txt')]
# Параллельно обрабатываем все файлы
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(process_file, files)
for filepath, line_count in zip(files, results):
print(f"{filepath}: {line_count} строк")