← Назад к вопросам
Как делить очереди?
2.7 Senior🔥 151 комментариев
#Архитектура и паттерны#Брокеры сообщений и очереди
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Разделение (шардирование) очередей сообщений
Разделение очередей — это техника масштабирования, которая распределяет сообщения по нескольким независимым очередям (партициям) для параллельной обработки и лучшей пропускной способности.
Проблемы одной очереди
// ❌ Одна очередь - узкое место
┌─────────────────────┐
│ Single Queue │ <- все сообщения сюда
│ Processing: 10 msg/s│
└─────────────────────┘
↓
[One Consumer]
(обрабатывает 10 msg/s)
// Проблемы:
1. Максимум 1 consumer может обрабатывать
2. Bottleneck: скорость ограничена 1 потребителем
3. Если consumer упадёт, вся система стопится
Решение: разделённые очереди (партиции)
// ✅ Разделённые очереди
┌──────────────────┐
│ Topic/Queue │
├──────────────────┤
│ Partition 0: [1,4,7...] │ -> [Consumer 1]
│ Partition 1: [2,5,8...] │ -> [Consumer 2]
│ Partition 2: [3,6,9...] │ -> [Consumer 3]
└──────────────────┘
// Преимущества:
1. Параллельная обработка в 3x раз быстрее
2. Распределённость - каждый consumer независим
3. Масштабируемость - добавляй consumers
Как разделить сообщения
1. По ключу (Key-based partitioning)
Используется hash функция для распределения по партициям.
// RabbitMQ с routing key
import amqp from 'amqplib';
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Создание exchange с партициями
await channel.assertExchange('orders', 'direct', { durable: true });
// Отправка сообщения с ключом
await channel.publish(
'orders',
'user:123', // routing key -> определяет партицию
Buffer.from(JSON.stringify({ orderId: 1, userId: 123 }))
);
// Потребитель на ключ
await channel.assertQueue('orders.queue.123');
await channel.bindQueue('orders.queue.123', 'orders', 'user:123');
await channel.consume('orders.queue.123', (msg) => {
console.log('Order:', JSON.parse(msg.content.toString()));
});
2. Kafka с явными партициями
Наиболее популярный способ разделения в больших системах.
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'app',
brokers: ['localhost:9092'],
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'order-group' });
// Создание топика с 3 партициями
await kafka.admin().createTopics({
topics: [
{
topic: 'orders',
numPartitions: 3, // 3 партиции
replicationFactor: 2,
},
],
});
// Отправка с ключом (определяет партицию)
await producer.send({
topic: 'orders',
messages: [
{
key: `user:123`, // hash(key) % 3 = партиция
value: JSON.stringify({ orderId: 1, userId: 123 }),
},
{
key: `user:456`,
value: JSON.stringify({ orderId: 2, userId: 456 }),
},
],
});
// Потребители автоматически распределяются по партициям
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
key: message.key?.toString(),
value: message.value?.toString(),
});
},
});
Стратегии разделения
Стратегия 1: По User ID
// Все события одного пользователя в одной партиции
// Гарантирует順序 обработки для пользователя
function getPartition(userId: string, partitionCount: number): number {
return hashCode(userId) % partitionCount;
}
await producer.send({
topic: 'user-events',
messages: [
{
key: userId, // hash(userId) определяет партицию
value: JSON.stringify(event),
},
],
});
// user:123 всегда -> партиция 1
// user:456 всегда -> партиция 0
// user:789 всегда -> партиция 2
// Порядок сохраняется!
Стратегия 2: По Region/Country
function getRegionPartition(region: string): number {
const regions = ['us', 'eu', 'asia'];
const index = regions.indexOf(region);
return index >= 0 ? index : 0;
}
// us -> партиция 0
// eu -> партиция 1
// asia -> партиция 2
await producer.send({
topic: 'orders',
messages: [
{
key: `region:${order.region}`,
value: JSON.stringify(order),
},
],
});
Стратегия 3: Round-Robin (без гарантий порядка)
class RoundRobinPartitioner {
private counter = 0;
getPartition(partitionCount: number): number {
return (this.counter++) % partitionCount;
}
}
// Быстро распределяет нагрузку
// Но НЕ гарантирует порядок обработки
await producer.send({
topic: 'logs',
messages: logs.map(log => ({
value: JSON.stringify(log),
// key не указан = round-robin
})),
});
Практический пример: Обработка заказов
// Scenario: 1000 заказов в секунду
// Без шардирования: 1 consumer = 1000 msg/sec = узкое место
// С шардированием: 10 consumers на 10 партициях = 10x пропускная способность
import { Kafka, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'order-processing' });
// Producer: отправка заказов
async function publishOrder(order: Order) {
await producer.send({
topic: 'orders',
messages: [
{
// Ключ = user_id гарантирует, что все заказы одного пользователя
// обрабатываются в правильном порядке
key: order.userId,
value: JSON.stringify(order),
},
],
});
}
// Consumer: обработка заказов параллельно
async function startOrderProcessor() {
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log(`Processing order ${order.id} on partition ${partition}`);
// Бизнес-логика
await validateOrder(order);
await processPayment(order);
await updateInventory(order);
await sendConfirmation(order);
},
});
}
// Запуск нескольких consumers в разных процессах
for (let i = 0; i < 10; i++) {
startOrderProcessor();
// Каждый consumer автоматически получит назначенные партиции
// Consumer 1 -> Partitions [0, 3, 6, 9]
// Consumer 2 -> Partitions [1, 4, 7]
// Consumer 3 -> Partitions [2, 5, 8]
}
Количество партиций: как выбрать?
// Формула
Num_Partitions = (Expected_throughput_in_MB/s / Consumer_max_throughput_in_MB/s) + Buffer
// Пример:
// - Ожидаемая пропускная способность: 1000 msg/sec = 10 MB/s
// - Один consumer обрабатывает: 100 msg/sec = 1 MB/s
// - Буффер: 2 (на случай падения одного consumer)
// - Нужно партиций: (10 / 1) + 2 = 12 партиций
await kafka.admin().createTopics({
topics: [
{
topic: 'events',
numPartitions: 12, // 12 партиций
replicationFactor: 3,
},
],
});
Гарантии при разделении
interface PartitioningGuarantees {
// 1. Within-partition ordering: гарантирует порядок в одной партиции
ordering: true; // ✓ Сообщения в одной партиции обрабатываются по порядку
// 2. Between-partition ordering: НЕ гарантирует
ordering_across: false; // ✗ Разные партиции могут обрабатываться не по порядку
// 3. Scalability: линейное масштабирование
scalability: true; // ✓ Добавь партицию = добавь пропускную способность
}
Типичные ошибки
// ❌ Плохо: не учитываешь порядок
await producer.send({
topic: 'transactions',
messages: transactions.map(t => ({
value: JSON.stringify(t),
// Без key = round-robin = разные партиции
// Может привести к обработке 'cancel' раньше 'create'
})),
});
// ✅ Хорошо: используй key для гарантии порядка
await producer.send({
topic: 'transactions',
messages: transactions.map(t => ({
key: t.accountId, // Все транзакции одного счёта в одной партиции
value: JSON.stringify(t),
})),
});
Разделение очередей — это критичная техника для масштабирования. Выбирай стратегию разделения в зависимости от требований: гарантии порядка (user-based), географическое разделение (region-based) или просто равномерное распределение (round-robin).