← Назад к вопросам

Какие происходят технические события после создания агрегаторами событий?

3.0 Senior🔥 81 комментариев
#Архитектура и паттерны

Комментарии (1)

🐱
deepseek-v3.2PrepBro AI6 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Отличный вопрос, который затрагивает сердцевину событийно-ориентированной архитектуры (Event-Driven Architecture, EDA). После успешного создания агрегатора событий (Event Aggregator) или, что более вероятно в контексте PHP, его публикации в шину событий (Event Bus) или брокер сообщений (например, RabbitMQ, Kafka), запускается цепочка важных технических процессов.

Давайте разберем их по порядку, от момента публикации до конечных последствий.

## 1. Публикация события в шину или брокер

Сам агрегатор (например, OrderWasPlaced) — это, как правило, иммутабельный объект-данных (DTO). Его создание — это лишь первый шаг. Далее происходит его публикация.

<?php

// 1. Создание агрегатора события
$orderPlacedEvent = new OrderWasPlaced(
    orderId: 'order_123',
    amount: 99.99,
    customerEmail: 'customer@example.com',
    occurredAt: new DateTimeImmutable()
);

// 2. Публикация через шину событий (синхронно в рамках процесса)
$this->eventBus->dispatch($orderPlacedEvent);

// ИЛИ публикация через брокер сообщений (асинхронно, вне процесса)
$this->messageBus->dispatch(
    new Envelope($orderPlacedEvent)
);
?>

## 2. Диспетчеризация и обработка синхронных слушателей

Если используется внутрипроцессная шина событий (например, Symfony Messenger в синхронном режиме, или собственный диспетчер), происходит немедленная диспетчеризация. Технически это выглядит так:

  1. Поиск слушателей (Listeners/Subscribers): Шина обращается к контейеру служб (например, PSR-11 Container) или собственному реестру, чтобы найти все сервисы, подписанные на тип события OrderWasPlaced.
  2. Вызов слушателей: Каждый слушатель вызывается последовательно, в том же процессе и потоке выполнения, что и код, создавший событие. Это критически важно для понимания.
<?php

// Пример слушателя (синхронного)
class SendOrderConfirmationEmailListener
{
    public function __invoke(OrderWasPlaced $event): void
    {
        // Это выполняется ПРЯМО СЕЙЧАС, в том же HTTP-запросе.
        // Если здесь выбросить исключение, оно может "сломать" основной поток.
        $this->emailService->sendConfirmation($event->getOrderId());
    }
}
?>
  • Плюсы: Простота, транзакционность (можно работать в рамках той же транзакции БД).
  • Минусы: Увеличивает время отклика основного действия. Сбой в слушателе может нарушить выполнение основной бизнес-логики.

## 3. Отправка в брокер сообщений для асинхронной обработки

Это более масштабируемый и устойчивый паттерн. Агрегатор события сериализуется (обычно в JSON или Avro) и помещается в очередь сообщений (например, в RabbitMQ) или топик (в Apache Kafka).

<?php
// Пример конфигурации транспорта для Symfony Messenger
// messenger.yaml
framework:
  messenger:
    transports:
      async_orders: '%env(MESSENGER_TRANSPORT_DSN)%' # например, doctrine://default или amqp://
    routing:
      'App\Event\OrderWasPlaced': async_orders
?>

Технические этапы:

  • Сериализация: Объект события преобразуется в строку. Важно, чтобы все данные были сериализуемы (избегать ресурсов, closure).
  • Транспортировка: Драйвер брокера (например, amqp или kafka) отправляет сообщение на сервер брокера.
  • Подтверждение: Брокер подтверждает получение. С этого момента ответственность за доставку лежит на брокере, а исходный PHP-процесс (например, HTTP-запрос) может завершиться.

## 4. Асинхронная обработка в воркерах

Это ключевой этап, происходящий уже вне контекста оригинального запроса.

  1. Запуск воркеров: Отдельные CLI-процессы (воркеры) постоянно опрашивают брокер сообщений на наличие новых сообщений.
  2. Получение и десериализация: Воркер получает сообщение, десериализует его и воссоздает объект агрегатора события (или его близкое представление).
  3. Выполнение бизнес-логики: Воркер запускает соответствующие обработчики (Handlers). Эти обработчики делают основную "работу" в ответ на событие.
<?php
// Пример асинхронного обработчика для Symfony Messenger
class UpdateInventoryHandler
{
    public function __invoke(OrderWasPlaced $event): void
    {
        // Этот код выполняется ПОЗЖЕ, в отдельном процессе.
        // Его падение не затронет пользователя, а сообщение будет повторно обработано или попадет в dead letter queue.
        $this->inventoryService->reserveItemsForOrder($event->getOrderId());
        // Это может быть долгая операция
        $this->analyticsService->trackPurchase($event);
    }
}
?>

## 5. Последствия и гарантии доставки

После отправки в брокер возникают вопросы устойчивости:

  • At-least-once delivery: Гарантия, что сообщение будет обработано хотя бы раз. Достигается подтверждением (ack) только после успешной обработки. Риск — двойная обработка. Требует идемпотентности обработчиков.
  • Exactly-once: Крайне сложно достичь на практике, обычно эмулируется комбинацией at-least-once и идемпотентности.
  • Retry & Dead Letter Queue (DLQ): При неудачной обработке (исключение) сообщение уходит на повторные попытки. После исчерпания попыток оно попадает в DLQ для ручного разбора.

## 6. Распространение в Event Store (для Event Sourcing)

Если используется Event Sourcing, то после создания агрегатор события обязательно сохраняется в Event Store (специализированное хранилище событий) как единственный источник истины о состоянии системы. Только после этого он может публиковаться для сторонних потребителей.

<?php
// Упрощенная иллюстрация
$this->eventStore->append(
    aggregateId: $order->getId(),
    events: [$orderPlacedEvent] // Агрегатор сохраняется
);
// Затем событие может быть отправлено в брокер для уведомления других систем
$this->eventBus->publishToExternalSystems($orderPlacedEvent);
?>

## Итог: что происходит технически

  1. Создание DTO-объекта события.
  2. Синхронная диспетчеризация (опционально, для внутрипроцессных, срочных действий).
  3. Сериализация и отправка в брокер сообщений (для асинхронных сценариев).
  4. Асинхронное потребление отдельными воркерами.
  5. Обеспечение надежности через подтверждения, повторные попытки и DLQ.
  6. Сохранение в Event Store (в архитектуре Event Sourcing).

Таким образом, создание агрегатора события — это лишь триггер для запуска сложного, часто распределенного, механизма обработки, который обеспечивает слабую связность, масштабируемость и отказоустойчивость backend-системы. В мире PHP это реализуется с помощью компонентов вроде Symfony Messenger, Laravel Queues + Events или фреймворков, таких как Spiral Framework с его RoadRunner.

Какие происходят технические события после создания агрегаторами событий? | PrepBro