← Назад к вопросам

Параллельные вычисления на PHP

3.0 Senior🔥 111 комментариев
#Архитектура и паттерны#Инфраструктура и DevOps

Условие

Написать класс (семейство классов), позволяющий реализовать параллельные вычисления на PHP.

Требования

  • Возможность запускать задачи параллельно
  • Получение результатов выполнения
  • Обработка ошибок в дочерних процессах
  • Ограничение количества одновременных процессов

Пример использования

$pool = new ProcessPool(4); // максимум 4 процесса

$pool->add(function() { return heavyCalculation1(); });
$pool->add(function() { return heavyCalculation2(); });
$pool->add(function() { return heavyCalculation3(); });

$results = $pool->wait(); // ждём завершения всех

Подходы

  • pcntl_fork
  • proc_open
  • Библиотеки: spatie/async, amphp/parallel
  • ReactPHP

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение

1. ProcessPool с pcntl_fork (Unix/Linux)

<?php

class ProcessTask {
    private int $pid = -1;
    private $callback;
    private $result = null;
    private ?Exception $exception = null;

    public function __construct(callable $callback) {
        $this->callback = $callback;
    }

    public function execute(): void {
        $this->pid = pcntl_fork();

        if ($this->pid === -1) {
            throw new \Exception("Could not fork process");
        }

        // Дочерний процесс
        if ($this->pid === 0) {
            try {
                $result = call_user_func($this->callback);
                echo json_encode(["success" => true, "result" => $result]);
                exit(0);
            } catch (\Throwable $e) {
                echo json_encode([
                    "success" => false,
                    "error" => $e->getMessage(),
                ]);
                exit(1);
            }
        }
    }

    public function getPid(): int { return $this->pid; }
    public function isRunning(): bool { return $this->pid > 0; }
}

class ProcessPool {
    private int $maxProcesses;
    private array $tasks = [];
    private array $results = [];

    public function __construct(int $maxProcesses = 4) {
        if (!extension_loaded("pcntl")) {
            throw new \Exception("pcntl extension is required");
        }
        $this->maxProcesses = $maxProcesses;
    }

    public function add(callable $callback): self {
        $this->tasks[] = new ProcessTask($callback);
        return $this;
    }

    public function wait(): array {
        $runningTasks = [];
        $taskIndex = 0;

        while ($taskIndex < count($this->tasks) || !empty($runningTasks)) {
            // Запускаем новые задачи если есть свободные слоты
            while (count($runningTasks) < $this->maxProcesses && $taskIndex < count($this->tasks)) {
                $task = $this->tasks[$taskIndex];
                $task->execute();
                $runningTasks[$task->getPid()] = $taskIndex;
                $taskIndex++;
            }

            if (empty($runningTasks)) break;

            // Ждём завершения любого процесса
            $pid = pcntl_waitpid(-1, $status, 0);

            if ($pid > 0 && isset($runningTasks[$pid])) {
                $index = $runningTasks[$pid];
                unset($runningTasks[$pid]);
            }
        }

        return $this->results;
    }
}

2. ProcessPool с proc_open (кроссплатформенный)

<?php

class AsyncTask {
    private $process = null;
    private $pipes = [];
    private $callback;
    private string $tempFile;
    private $result = null;
    private ?Exception $exception = null;

    public function __construct(callable $callback) {
        $this->callback = $callback;
        $this->tempFile = tempnam(sys_get_temp_dir(), "async_");
    }

    public function start(): void {
        // Сериализуем callback
        $serialized = base64_encode(serialize($this->callback));
        $code = sprintf(
            'php -r "$cb=unserialize(base64_decode(\'%s\')); echo json_encode([\"result\"=>$cb(), \"success\"=>true]);" > %s',
            $serialized,
            escapeshellarg($this->tempFile)
        );

        $this->process = proc_open(
            $code,
            [],
            $this->pipes
        );
    }

    public function isRunning(): bool {
        if (!is_resource($this->process)) return false;
        $status = proc_get_status($this->process);
        return $status["running"];
    }

    public function wait(): array {
        proc_close($this->process);
        
        $output = file_get_contents($this->tempFile);
        unlink($this->tempFile);

        try {
            return json_decode($output, true) ?? [];
        } catch (\Throwable $e) {
            return ["success" => false, "error" => $e->getMessage()];
        }
    }
}

class AsyncPool {
    private int $maxProcesses;
    private array $tasks = [];
    private array $results = [];

    public function __construct(int $maxProcesses = 4) {
        $this->maxProcesses = $maxProcesses;
    }

    public function add(callable $callback): self {
        $this->tasks[] = new AsyncTask($callback);
        return $this;
    }

    public function wait(): array {
        $running = [];

        // Запускаем начальный батч
        for ($i = 0; $i < min($this->maxProcesses, count($this->tasks)); $i++) {
            $this->tasks[$i]->start();
            $running[$i] = true;
        }

        $nextTaskIndex = $this->maxProcesses;

        while (!empty($running)) {
            foreach (array_keys($running) as $taskIndex) {
                $task = $this->tasks[$taskIndex];

                if (!$task->isRunning()) {
                    // Процесс завершился
                    $this->results[$taskIndex] = $task->wait();
                    unset($running[$taskIndex]);

                    // Запускаем следующую задачу
                    if ($nextTaskIndex < count($this->tasks)) {
                        $this->tasks[$nextTaskIndex]->start();
                        $running[$nextTaskIndex] = true;
                        $nextTaskIndex++;
                    }
                }
            }

            usleep(100000); // 100ms
        }

        return $this->results;
    }
}

3. Решение с использованием Threads (PHP 8.2+)

<?php

use \Thread;
use \Threaded;

class WorkerThread extends Thread {
    private string $id;
    private $callback;
    private $result = null;
    private ?Throwable $exception = null;

    public function __construct(string $id, callable $callback) {
        $this->id = $id;
        $this->callback = $callback;
    }

    public function run(): void {
        try {
            $this->result = call_user_func($this->callback);
        } catch (\Throwable $e) {
            $this->exception = $e;
        }
    }

    public function getResult() { return $this->result; }
    public function getException(): ?Throwable { return $this->exception; }
    public function getId(): string { return $this->id; }
}

class ThreadPool {
    private int $maxThreads;
    private array $threads = [];
    private array $results = [];

    public function __construct(int $maxThreads = 4) {
        $this->maxThreads = $maxThreads;
    }

    public function add(callable $callback): self {
        $thread = new WorkerThread(
            uniqid(),
            $callback
        );
        $this->threads[] = $thread;
        return $this;
    }

    public function wait(): array {
        $running = [];
        $index = 0;

        // Запускаем начальный батч потоков
        while ($index < count($this->threads) && count($running) < $this->maxThreads) {
            $thread = $this->threads[$index];
            $thread->start();
            $running[$index] = $thread;
            $index++;
        }

        // Ждём завершения потоков
        while (!empty($running)) {
            foreach (array_keys($running) as $threadIndex) {
                $thread = $running[$threadIndex];

                if (!$thread->isAlive()) {
                    // Поток завершился
                    if ($thread->getException()) {
                        $this->results[$threadIndex] = [
                            "success" => false,
                            "error" => $thread->getException()->getMessage(),
                        ];
                    } else {
                        $this->results[$threadIndex] = [
                            "success" => true,
                            "result" => $thread->getResult(),
                        ];
                    }

                    unset($running[$threadIndex]);

                    // Запускаем следующий поток
                    if ($index < count($this->threads)) {
                        $thread = $this->threads[$index];
                        $thread->start();
                        $running[$index] = $thread;
                        $index++;
                    }
                }
            }

            usleep(100000);
        }

        return $this->results;
    }
}

4. Использование spatie/async (рекомендуется)

<?php

use Spatie\Async\Pool;

// composer require spatie/async

$pool = Pool::create()
    ->concurrency(4);

$pool->add(function() {
    // Запросить из БД
    return "Result 1";
})->then(function($output) {
    echo "Completed: $output";
})->catch(function($exception) {
    echo "Error: " . $exception->getMessage();
});

$pool->add(function() {
    return "Result 2";
});

$results = $pool->wait();
var_dump($results);

5. Практический пример - параллельная загрузка данных

<?php

function fetchUserData(int $userId): array {
    // Имитация медленного запроса
    sleep(1);
    return [
        "id" => $userId,
        "name" => "User $userId",
        "email" => "user$userId@example.com",
    ];
}

function fetchUserPosts(int $userId): array {
    sleep(1);
    return [
        "user_id" => $userId,
        "posts" => [
            ["id" => 1, "title" => "Post 1"],
            ["id" => 2, "title" => "Post 2"],
        ],
    ];
}

// Последовательно: ~4 секунды
echo "Sequential:\n";
$start = microtime(true);
for ($i = 1; $i <= 4; $i++) {
    $user = fetchUserData($i);
    $posts = fetchUserPosts($i);
}
echo "Time: " . (microtime(true) - $start) . "s\n"; // ~4s

// Параллельно: ~1 секунда
echo "\nParallel:\n";
$start = microtime(true);

$pool = new AsyncPool(4);

for ($i = 1; $i <= 4; $i++) {
    $userId = $i;
    $pool->add(function() use ($userId) {
        return [
            "user" => fetchUserData($userId),
            "posts" => fetchUserPosts($userId),
        ];
    });
}

$results = $pool->wait();
echo "Time: " . (microtime(true) - $start) . "s\n"; // ~1s
var_dump($results);

6. Тесты

<?php

use PHPUnit\Framework\TestCase;

class ProcessPoolTest extends TestCase {
    public function test_single_task(): void {
        $pool = new AsyncPool(1);
        $pool->add(fn() => 5 * 2);
        $results = $pool->wait();
        
        $this->assertNotEmpty($results);
        $this->assertTrue($results[0]["success"] ?? false);
    }

    public function test_multiple_tasks(): void {
        $pool = new AsyncPool(4);
        
        for ($i = 0; $i < 10; $i++) {
            $pool->add(fn() => $i * 2);
        }
        
        $results = $pool->wait();
        $this->assertCount(10, $results);
    }

    public function test_error_handling(): void {
        $pool = new AsyncPool(1);
        $pool->add(fn() => throw new \Exception("Test error"));
        $results = $pool->wait();
        
        $this->assertFalse($results[0]["success"] ?? true);
    }
}

7. Сравнение подходов

ПодходКроссплатформаСложностьПроизводительностьРекомендуется
pcntl_forkНет (Unix)СредняяВысокаяLinux-сервера
proc_openДаСредняяХорошаяУниверсально
ThreadsДа (8.2+)ВышеОчень высокаяPHP 8.2+
spatie/asyncДаНизкаяВысокаяProduction
amphp/parallelДаСредняяВысокаяПродвинутые сценарии

8. Рекомендация

Для production - используй spatie/async:

composer require spatie/async
use Spatie\Async\Pool;

$pool = Pool::create()->concurrency(4);

for ($i = 0; $i < 10; $i++) {
    $pool->add(function() { /* task */ });
}

foreach ($pool->wait() as $result) {
    echo $result . PHP_EOL;
}

Для собеседования - покажи собственную реализацию с proc_open: Это покажет понимание процессов и управления ими на низком уровне.