← Назад к вопросам

Какие знаешь инструменты запуска нескольких процессов в Python?

2.0 Middle🔥 181 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Инструменты запуска нескольких процессов в Python

Есть несколько способов запустить несколько процессов:

1. multiprocessing.Pool — Самый практичный способ

Идеален для параллельной обработки данных.

import multiprocessing
import time

def task(n):
    """Вычислительная задача"""
    time.sleep(1)
    return n * n

if __name__ == '__main__':
    # Создаём пул из 4 процессов
    with multiprocessing.Pool(4) as pool:
        # map: применяем функцию ко всем элементам
        results = pool.map(task, range(10))
        print(results)  # [0, 1, 4, 9, 16, ...]
        
        # imap: ленивая версия (экономит память)
        for result in pool.imap(task, range(100)):
            print(result)
        
        # apply_async: асинхронный запрос
        async_result = pool.apply_async(task, (5,))
        result = async_result.get(timeout=5)  # Ждём результат
        print(result)  # 25

Особенности:

  • Автоматическое управление процессами
  • Очередь задач (task queue)
  • Балансировка нагрузки
  • Контекст-менеджер (with) автоматически закрывает пул

2. multiprocessing.Process — Низкоуровневый контроль

Для более сложных сценариев.

import multiprocessing
import time

def worker(name, queue):
    """Рабочий процесс"""
    for i in range(5):
        time.sleep(1)
        queue.put(f"{name} выполнил итерацию {i}")
    queue.put(None)  # Сигнал окончания

if __name__ == '__main__':
    queue = multiprocessing.Queue()  # Очередь для общения
    
    # Создаём 3 процесса
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(f"Worker-{i}", queue))
        p.start()
        processes.append(p)
    
    # Получаем результаты
    completed = 0
    while completed < 3:
        msg = queue.get()  # Блокирует, пока нет данных
        if msg is None:
            completed += 1
        else:
            print(msg)
    
    # Ждём завершения
    for p in processes:
        p.join()  # Блокирует, пока процесс не завершится
    
    print("Все процессы завершены")

Особенности:

  • Полный контроль над процессами
  • Можно запускать разные функции
  • Общение через Queue, Pipe
  • Нужно вручную управлять жизненным циклом

3. concurrent.futures.ProcessPoolExecutor — Современный способ

Аналог Pool, но современнее.

from concurrent.futures import ProcessPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * n

if __name__ == '__main__':
    # Контекст-менеджер автоматически управляет пулом
    with ProcessPoolExecutor(max_workers=4) as executor:
        # map: применяем функцию
        results = executor.map(task, range(10))
        print(list(results))  # [0, 1, 4, 9, 16, ...]
        
        # submit: отправляем одну задачу
        future = executor.submit(task, 5)
        result = future.result(timeout=5)
        print(result)  # 25
        
        # Ждём первый готовый результат
        from concurrent.futures import as_completed
        futures = [executor.submit(task, i) for i in range(5)]
        for future in as_completed(futures):
            print(f"Готов результат: {future.result()}")

Преимущества над Pool:

  • Возвращает Future объекты (удобнее)
  • Единый интерфейс (ProcessPoolExecutor, ThreadPoolExecutor, можно менять)
  • Современный API

4. multiprocessing.Manager — Общее состояние между процессами

Для синхронизации данных между процессами.

import multiprocessing

def worker(shared_list, shared_dict, name):
    """Рабочий может изменять общие структуры"""
    shared_list.append(f"{name} добавил элемент")
    shared_dict[name] = {'status': 'completed', 'items': len(shared_list)}

if __name__ == '__main__':
    # Manager создаёт общие структуры
    manager = multiprocessing.Manager()
    shared_list = manager.list()
    shared_dict = manager.dict()
    
    # Запускаем процессы
    processes = []
    for i in range(3):
        p = multiprocessing.Process(
            target=worker,
            args=(shared_list, shared_dict, f"Worker-{i}")
        )
        p.start()
        processes.append(p)
    
    # Ждём
    for p in processes:
        p.join()
    
    # Видим общие данные
    print(list(shared_list))
    # ['Worker-0 добавил элемент', 'Worker-1 добавил элемент', ...]
    print(dict(shared_dict))
    # {'Worker-0': {'status': 'completed', 'items': 3}, ...}

Использование: синхронизация счётчиков, логирования, статуса между процессами.

5. subprocess — Запуск внешних программ

Для запуска других скриптов или команд ОС.

import subprocess
import concurrent.futures

# Запустить внешний скрипт
result = subprocess.run(
    ['python', 'external_script.py', '--arg', 'value'],
    capture_output=True,
    text=True
)
print(result.stdout)
print(result.returncode)

# Запустить несколько команд параллельно
def run_command(cmd):
    result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
    return result.stdout

with concurrent.futures.ProcessPoolExecutor() as executor:
    commands = [
        'echo "Hello from 1"',
        'echo "Hello from 2"',
        'echo "Hello from 3"',
    ]
    results = executor.map(run_command, commands)
    for r in results:
        print(r.strip())

6. asyncio — Асинхронное программирование (не процессы, а корутины)

Для I/O-bound задач без создания процессов.

import asyncio
import aiohttp

async def fetch_url(session, url):
    """Загружаем URL асинхронно"""
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'http://example.com',
        'http://google.com',
        'http://github.com',
    ]
    
    async with aiohttp.ClientSession() as session:
        # gather запускает все корутины параллельно
        results = await asyncio.gather(
            *[fetch_url(session, url) for url in urls]
        )
        print(len(results))

# Запуск
asyncio.run(main())

Преимущество: не создаёт новые процессы, использует один процесс с переключением контекста.

7. celery + Redis — Распределённая обработка

Для больших приложений с микросервисами.

from celery import Celery
import time

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def add(x, y):
    """Асинхронная задача"""
    time.sleep(2)
    return x + y

# Отправляем в очередь (не ждём результат)
task = add.delay(4, 6)

# Позже получаем результат
result = task.get(timeout=10)  # 10

# Несколько задач
results = [add.delay(i, i+1) for i in range(5)]
for result in results:
    print(result.get())

Преимущества:

  • Распределённая обработка (процессы на разных машинах)
  • Очередь задач
  • Масштабируемость
  • Переиспользование рабочих процессов

Сравнительная таблица

ИнструментТипКогда использоватьСложность
PoolПроцессыПростая параллельная обработкаНизкая
ProcessПроцессыСложное управление жизненным цикломСредняя
ProcessPoolExecutorПроцессыСовременный способНизкая
ManagerПроцессы + синхр.Общее состояние между процессамиСредняя
subprocessВнешние программыЗапуск других скриптов/командНизкая
asyncioКорутины (один процесс)I/O-bound, сетевые запросыСредняя
celeryРаспределённая обработкаМикросервисы, масштабированиеВысокая

На собеседовании

"Для простой параллельной обработки — multiprocessing.Pool. Для более сложных сценариев — concurrent.futures.ProcessPoolExecutor. Для I/O-bound задач без создания процессов — asyncio. Для микросервисов и распределённой обработки — celery. Главное помнить, что процессы дорогие (память, создание), поэтому для I/O используй asyncio (одна корутина на I/O ждёт, другие выполняются)."

Практический пример: обработка списка файлов

from concurrent.futures import ProcessPoolExecutor
import os

def process_file(filepath):
    """Обработка одного файла"""
    with open(filepath, 'r') as f:
        lines = f.readlines()
    return len(lines)

if __name__ == '__main__':
    files = [f for f in os.listdir('.') if f.endswith('.txt')]
    
    # Параллельно обрабатываем все файлы
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(process_file, files)
    
    for filepath, line_count in zip(files, results):
        print(f"{filepath}: {line_count} строк")