Какую используешь семантику модели доставки сообщения?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Какую используешь семантику модели доставки сообщения?
Message Delivery Semantics — это гарантии того, как система обрабатывает сообщения в асинхронной коммуникации. Это критичный выбор при работе с очередями, message brokers и микросервисами. Объясню подробно, когда и что использовать.
Три основные семантики
1. At-Most-Once (максимум один раз)
Сообщение может быть потеряно, но никогда не будет обработано дважды.
// Пример: отправка сообщения в kafka с no ack
const producer = new KafkaProducer({
brokers: ['localhost:9092'],
acks: 0 // At-most-once: не ждем подтверждения
});
await producer.send({
topic: 'user.created',
messages: [{ value: JSON.stringify(user) }]
// Сообщение отправлено, но может быть потеряно
});
Сценарий:
Процесс отправляет сообщение
↓
Crash!
↓
Сообщение ПОТЕРЯНО
Вероятность duplicate: 0%
Вероятность loss: Высокая
Когда использую:
- Метрики (если потеряется одна метрика, не критично)
- Логирование аналитики
- Кэширование инвалидации (не критично)
- Уведомления (которые можно пропустить)
Преимущества:
- Максимальная производительность
- Минимум ресурсов
- Простая реализация
Недостатки:
- Данные могут быть потеряны
- Не подходит для критичных операций
// Реальный пример: отправка метрики
class MetricsService {
async recordPageView(userId: string) {
// Если это потеряется, не критично
await producer.send({
topic: 'page.viewed',
messages: [{ value: JSON.stringify({ userId, timestamp: Date.now() }) }]
});
}
}
2. At-Least-Once (минимум один раз)
Сообщение гарантированно обработано, но может быть обработано несколько раз.
// Пример: Kafka с acks: 'all'
const producer = new KafkaProducer({
brokers: ['localhost:9092'],
acks: 'all', // At-least-once: ждем подтверждения от всех
retries: 3 // Повторяем в случае ошибки
});
await producer.send({
topic: 'order.created',
messages: [{ value: JSON.stringify(order) }]
});
Сценарий:
Процесс отправляет сообщение
↓
Broker подтверждает
↓
Процесс делает Ack
↓
Crash в процессе!
↓
Message переобрабатывается
↓
Дубликат обработан!
Вероятность duplicate: Высокая
Вероятность loss: 0%
Когда использую:
- Платежи (но с обработкой дубликатов)
- Создание заказов
- Важные уведомления
- Изменения в БД
Требует идемпотентности!
// Нужно обработать duplicate гарантированно
const queue = new Queue('orders', { redis });
queue.process('order.created', async (job) => {
const { orderId, userId } = job.data;
// Проверяем, не создали ли уже
const existingOrder = await Order.findById(orderId);
if (existingOrder) {
return; // Уже обработано, не обрабатываем еще раз
}
// Создаем заказ
await Order.create({ id: orderId, userId });
// Отправляем email
await sendOrderConfirmationEmail(userId);
});
3. Exactly-Once (ровно один раз)
Сообщение обработано ровно один раз. Это самое сложное и дорогое в реализации.
// Kafka transactions + idempotent producer
const producer = new KafkaProducer({
brokers: ['localhost:9092'],
idempotent: true, // Exactly-once
maxInFlightRequests: 5,
transactionTimeout: 30000
});
const admin = kafka.admin();
await admin.createPartitioner();
const transaction = await producer.transaction();
await transaction.send({
topic: 'payment.processed',
messages: [{ value: JSON.stringify(payment) }]
});
await transaction.commit();
Сценарий:
Запись в БД (транзакция)
Отправка сообщения (транзакция)
↓
Commit
↓
Оба выполнены или оба откачены
Вероятность duplicate: 0%
Вероятность loss: 0%
Когда использую:
- Финансовые операции
- Критичные изменения состояния
- Операции с деньгами
Сложность:
- Требует distributed transactions
- Может быть медленнее
- Complexity в реализации
Реальный выбор в мою практику
Мой подход: At-Least-Once + Идемпотентность
Точный-Once сложный и медленный. At-Least-Once + правильная идемпотентность = практичное решение.
// Реальный пример: система обработки платежей
const paymentQueue = new Queue('payments', {
redis: { host: 'localhost' }
});
paymentQueue.process('payment.process', async (job) => {
const { paymentId, userId, amount } = job.data;
// Шаг 1: Проверяем, не обработана ли уже
const payment = await Payment.findById(paymentId);
if (payment.status === 'completed') {
console.log('Payment already processed, skipping');
return { success: true, idempotent: true };
}
// Шаг 2: Обновляем статус (guard against duplicates)
await Payment.update(
{ id: paymentId, status: 'pending' },
{ status: 'processing' }
);
try {
// Шаг 3: Обрабатываем платеж
const result = await paymentGateway.charge({
amount,
customerId: userId
});
// Шаг 4: Сохраняем результат
await Payment.update(
{ id: paymentId },
{ status: 'completed', transactionId: result.id }
);
// Шаг 5: Отправляем other events
await emailQueue.add({
userId,
type: 'payment_success',
paymentId
});
} catch (err) {
// Шаг 6: В случае ошибки откатываем
await Payment.update(
{ id: paymentId },
{ status: 'failed', error: err.message }
);
throw err; // Bull переповторит
}
});
Гарантирование идемпотентности
Техника 1: Уникальный ключ
// Использовать idempotency key
async function processPayment(paymentData) {
const { idempotencyKey, amount, userId } = paymentData;
// Проверяем, есть ли уже такой
const existing = await IdempotencyLog.findOne({ key: idempotencyKey });
if (existing) {
return existing.result;
}
// Выполняем операцию
const result = await chargeCreditCard(amount, userId);
// Сохраняем результат
await IdempotencyLog.create({
key: idempotencyKey,
result,
timestamp: Date.now()
});
return result;
}
Техника 2: Conditional update
// Использовать версионирование
async function updateUserBalance(userId, delta) {
const result = await db.query(
`UPDATE users
SET balance = balance + $1, version = version + 1
WHERE id = $2 AND version = $3
RETURNING *`,
[delta, userId, currentVersion]
);
if (result.rowCount === 0) {
// Не обновилось = уже обработано
return 'already_processed';
}
return result.rows[0];
}
Техника 3: State tracking
// Отслеживать статусы
async function processOrder(orderId) {
const order = await Order.findById(orderId);
// State machine
switch (order.status) {
case 'pending':
await order.update({ status: 'processing' });
// ... процессинг ...
await order.update({ status: 'completed' });
break;
case 'processing':
// Уже обрабатывается, может быть это дублик
await delay(100);
return processOrder(orderId); // Повторим через время
case 'completed':
// Уже завершено, не обрабатываем
return 'already_completed';
}
}
Сравнение очередей и их семантики
| Система | Поддерживает | Комментарий |
|---|---|---|
| RabbitMQ | At-most, At-least | Выбирается при конфигурации |
| Kafka | At-least, Exactly | Transactions + idempotent producer |
| Bull (Redis) | At-least | Нужна ручная идемпотентность |
| AWS SQS | At-least | Может быть дублик |
| AWS SNS | At-most | Может потеряться |
| Pub/Sub | At-most | Firebase, Google Cloud |
Мой практический выбор в разных сценариях
Сценарий 1: Уведомления
// At-most-once - потеряется 1 уведомление, не критично
await notificationQueue.add(
{ userId, message: 'Welcome!' },
{ removeOnFail: false } // Удалять после успеха
);
Сценарий 2: Email рассылка
// At-least-once + идемпотентность
// (Пользователь может получить дубликат письма, но не критично)
const emailQueue = new Queue('emails', {
redis,
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 2000 }
}
});
Сценарий 3: Платежи
// Exactly-once или At-least-once + strong idempotency
const paymentQueue = new Queue('payments', {
redis,
defaultJobOptions: {
attempts: 10, // Много повторов
removeOnComplete: false // Сохраняем логи
}
});
// ОБЯЗАТЕЛЬНО идемпотентные обработчики!
Сценарий 4: Analytics
// At-most-once - не нужна точность
await analyticsQueue.add(
{ event: 'page_view', userId, page },
{ removeOnFail: true } // Удалить при ошибке
);
Итоги
Мой подход в production:
-
По умолчанию: At-least-once + Идемпотентность
- Гарантирует обработку
- Дешево в реализации
- Надежно
-
Для критичных операций: Exactly-once с transactions
- Платежи
- Финансовые операции
- Критичные изменения состояния
-
Для non-critical: At-most-once
- Метрики
- Логирование
- Кэширование
Важный принцип: Идемпотентность — это не опция, это требование при работе с асинхронными системами. Даже если выбираешь At-least-once, всегда проектируй обработчики как идемпотентные.