← Назад к вопросам
Параллельные вычисления на PHP
3.0 Senior🔥 111 комментариев
#Архитектура и паттерны#Инфраструктура и DevOps
Условие
Написать класс (семейство классов), позволяющий реализовать параллельные вычисления на PHP.
Требования
- Возможность запускать задачи параллельно
- Получение результатов выполнения
- Обработка ошибок в дочерних процессах
- Ограничение количества одновременных процессов
Пример использования
$pool = new ProcessPool(4); // максимум 4 процесса
$pool->add(function() { return heavyCalculation1(); });
$pool->add(function() { return heavyCalculation2(); });
$pool->add(function() { return heavyCalculation3(); });
$results = $pool->wait(); // ждём завершения всех
Подходы
- pcntl_fork
- proc_open
- Библиотеки: spatie/async, amphp/parallel
- ReactPHP
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Решение
1. ProcessPool с pcntl_fork (Unix/Linux)
<?php
class ProcessTask {
private int $pid = -1;
private $callback;
private $result = null;
private ?Exception $exception = null;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function execute(): void {
$this->pid = pcntl_fork();
if ($this->pid === -1) {
throw new \Exception("Could not fork process");
}
// Дочерний процесс
if ($this->pid === 0) {
try {
$result = call_user_func($this->callback);
echo json_encode(["success" => true, "result" => $result]);
exit(0);
} catch (\Throwable $e) {
echo json_encode([
"success" => false,
"error" => $e->getMessage(),
]);
exit(1);
}
}
}
public function getPid(): int { return $this->pid; }
public function isRunning(): bool { return $this->pid > 0; }
}
class ProcessPool {
private int $maxProcesses;
private array $tasks = [];
private array $results = [];
public function __construct(int $maxProcesses = 4) {
if (!extension_loaded("pcntl")) {
throw new \Exception("pcntl extension is required");
}
$this->maxProcesses = $maxProcesses;
}
public function add(callable $callback): self {
$this->tasks[] = new ProcessTask($callback);
return $this;
}
public function wait(): array {
$runningTasks = [];
$taskIndex = 0;
while ($taskIndex < count($this->tasks) || !empty($runningTasks)) {
// Запускаем новые задачи если есть свободные слоты
while (count($runningTasks) < $this->maxProcesses && $taskIndex < count($this->tasks)) {
$task = $this->tasks[$taskIndex];
$task->execute();
$runningTasks[$task->getPid()] = $taskIndex;
$taskIndex++;
}
if (empty($runningTasks)) break;
// Ждём завершения любого процесса
$pid = pcntl_waitpid(-1, $status, 0);
if ($pid > 0 && isset($runningTasks[$pid])) {
$index = $runningTasks[$pid];
unset($runningTasks[$pid]);
}
}
return $this->results;
}
}
2. ProcessPool с proc_open (кроссплатформенный)
<?php
class AsyncTask {
private $process = null;
private $pipes = [];
private $callback;
private string $tempFile;
private $result = null;
private ?Exception $exception = null;
public function __construct(callable $callback) {
$this->callback = $callback;
$this->tempFile = tempnam(sys_get_temp_dir(), "async_");
}
public function start(): void {
// Сериализуем callback
$serialized = base64_encode(serialize($this->callback));
$code = sprintf(
'php -r "$cb=unserialize(base64_decode(\'%s\')); echo json_encode([\"result\"=>$cb(), \"success\"=>true]);" > %s',
$serialized,
escapeshellarg($this->tempFile)
);
$this->process = proc_open(
$code,
[],
$this->pipes
);
}
public function isRunning(): bool {
if (!is_resource($this->process)) return false;
$status = proc_get_status($this->process);
return $status["running"];
}
public function wait(): array {
proc_close($this->process);
$output = file_get_contents($this->tempFile);
unlink($this->tempFile);
try {
return json_decode($output, true) ?? [];
} catch (\Throwable $e) {
return ["success" => false, "error" => $e->getMessage()];
}
}
}
class AsyncPool {
private int $maxProcesses;
private array $tasks = [];
private array $results = [];
public function __construct(int $maxProcesses = 4) {
$this->maxProcesses = $maxProcesses;
}
public function add(callable $callback): self {
$this->tasks[] = new AsyncTask($callback);
return $this;
}
public function wait(): array {
$running = [];
// Запускаем начальный батч
for ($i = 0; $i < min($this->maxProcesses, count($this->tasks)); $i++) {
$this->tasks[$i]->start();
$running[$i] = true;
}
$nextTaskIndex = $this->maxProcesses;
while (!empty($running)) {
foreach (array_keys($running) as $taskIndex) {
$task = $this->tasks[$taskIndex];
if (!$task->isRunning()) {
// Процесс завершился
$this->results[$taskIndex] = $task->wait();
unset($running[$taskIndex]);
// Запускаем следующую задачу
if ($nextTaskIndex < count($this->tasks)) {
$this->tasks[$nextTaskIndex]->start();
$running[$nextTaskIndex] = true;
$nextTaskIndex++;
}
}
}
usleep(100000); // 100ms
}
return $this->results;
}
}
3. Решение с использованием Threads (PHP 8.2+)
<?php
use \Thread;
use \Threaded;
class WorkerThread extends Thread {
private string $id;
private $callback;
private $result = null;
private ?Throwable $exception = null;
public function __construct(string $id, callable $callback) {
$this->id = $id;
$this->callback = $callback;
}
public function run(): void {
try {
$this->result = call_user_func($this->callback);
} catch (\Throwable $e) {
$this->exception = $e;
}
}
public function getResult() { return $this->result; }
public function getException(): ?Throwable { return $this->exception; }
public function getId(): string { return $this->id; }
}
class ThreadPool {
private int $maxThreads;
private array $threads = [];
private array $results = [];
public function __construct(int $maxThreads = 4) {
$this->maxThreads = $maxThreads;
}
public function add(callable $callback): self {
$thread = new WorkerThread(
uniqid(),
$callback
);
$this->threads[] = $thread;
return $this;
}
public function wait(): array {
$running = [];
$index = 0;
// Запускаем начальный батч потоков
while ($index < count($this->threads) && count($running) < $this->maxThreads) {
$thread = $this->threads[$index];
$thread->start();
$running[$index] = $thread;
$index++;
}
// Ждём завершения потоков
while (!empty($running)) {
foreach (array_keys($running) as $threadIndex) {
$thread = $running[$threadIndex];
if (!$thread->isAlive()) {
// Поток завершился
if ($thread->getException()) {
$this->results[$threadIndex] = [
"success" => false,
"error" => $thread->getException()->getMessage(),
];
} else {
$this->results[$threadIndex] = [
"success" => true,
"result" => $thread->getResult(),
];
}
unset($running[$threadIndex]);
// Запускаем следующий поток
if ($index < count($this->threads)) {
$thread = $this->threads[$index];
$thread->start();
$running[$index] = $thread;
$index++;
}
}
}
usleep(100000);
}
return $this->results;
}
}
4. Использование spatie/async (рекомендуется)
<?php
use Spatie\Async\Pool;
// composer require spatie/async
$pool = Pool::create()
->concurrency(4);
$pool->add(function() {
// Запросить из БД
return "Result 1";
})->then(function($output) {
echo "Completed: $output";
})->catch(function($exception) {
echo "Error: " . $exception->getMessage();
});
$pool->add(function() {
return "Result 2";
});
$results = $pool->wait();
var_dump($results);
5. Практический пример - параллельная загрузка данных
<?php
function fetchUserData(int $userId): array {
// Имитация медленного запроса
sleep(1);
return [
"id" => $userId,
"name" => "User $userId",
"email" => "user$userId@example.com",
];
}
function fetchUserPosts(int $userId): array {
sleep(1);
return [
"user_id" => $userId,
"posts" => [
["id" => 1, "title" => "Post 1"],
["id" => 2, "title" => "Post 2"],
],
];
}
// Последовательно: ~4 секунды
echo "Sequential:\n";
$start = microtime(true);
for ($i = 1; $i <= 4; $i++) {
$user = fetchUserData($i);
$posts = fetchUserPosts($i);
}
echo "Time: " . (microtime(true) - $start) . "s\n"; // ~4s
// Параллельно: ~1 секунда
echo "\nParallel:\n";
$start = microtime(true);
$pool = new AsyncPool(4);
for ($i = 1; $i <= 4; $i++) {
$userId = $i;
$pool->add(function() use ($userId) {
return [
"user" => fetchUserData($userId),
"posts" => fetchUserPosts($userId),
];
});
}
$results = $pool->wait();
echo "Time: " . (microtime(true) - $start) . "s\n"; // ~1s
var_dump($results);
6. Тесты
<?php
use PHPUnit\Framework\TestCase;
class ProcessPoolTest extends TestCase {
public function test_single_task(): void {
$pool = new AsyncPool(1);
$pool->add(fn() => 5 * 2);
$results = $pool->wait();
$this->assertNotEmpty($results);
$this->assertTrue($results[0]["success"] ?? false);
}
public function test_multiple_tasks(): void {
$pool = new AsyncPool(4);
for ($i = 0; $i < 10; $i++) {
$pool->add(fn() => $i * 2);
}
$results = $pool->wait();
$this->assertCount(10, $results);
}
public function test_error_handling(): void {
$pool = new AsyncPool(1);
$pool->add(fn() => throw new \Exception("Test error"));
$results = $pool->wait();
$this->assertFalse($results[0]["success"] ?? true);
}
}
7. Сравнение подходов
| Подход | Кроссплатформа | Сложность | Производительность | Рекомендуется |
|---|---|---|---|---|
| pcntl_fork | Нет (Unix) | Средняя | Высокая | Linux-сервера |
| proc_open | Да | Средняя | Хорошая | Универсально |
| Threads | Да (8.2+) | Выше | Очень высокая | PHP 8.2+ |
| spatie/async | Да | Низкая | Высокая | Production |
| amphp/parallel | Да | Средняя | Высокая | Продвинутые сценарии |
8. Рекомендация
Для production - используй spatie/async:
composer require spatie/async
use Spatie\Async\Pool;
$pool = Pool::create()->concurrency(4);
for ($i = 0; $i < 10; $i++) {
$pool->add(function() { /* task */ });
}
foreach ($pool->wait() as $result) {
echo $result . PHP_EOL;
}
Для собеседования - покажи собственную реализацию с proc_open: Это покажет понимание процессов и управления ими на низком уровне.