← Назад к вопросам

Что будет в RabbitMQ если сообщение не доставлено?

1.0 Junior🔥 41 комментариев
#Docker, Kubernetes и DevOps#JVM и управление памятью

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что будет в RabbitMQ если сообщение не доставлено?

Отличный вопрос, касающийся надёжности обработки сообщений в RabbitMQ. Это критичный аспект при работе с очередями, и различные сценарии требуют разных подходов.

Сценарий 1: Сообщение не обработано (Exception)

Если обработчик выбросит исключение:

@Service
public class UserEventHandler {
    private final UserService userService;
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @RabbitListener(queues = "user-events")
    public void handleUserEvent(UserEvent event) {
        try {
            // Возможно исключение
            userService.processUser(event);
        } catch (UserProcessingException e) {
            // Что произойдёт дальше?
            logger.error("Failed to process event", e);
            throw e;  // Re-throw исключение
        }
    }
}

Поведение RabbitMQ:

  1. Без конфигурации: RabbitMQ по умолчанию отправляет сообщение обратно в очередь (requeue)
  2. Бесконечный цикл: Сообщение будет обработано снова и снова, создавая бесконечный loop
  3. Блокировка очереди: Одно плохое сообщение может заблокировать обработку остальных

Решение 1: Retry с Dead Letter Queue (DLQ)

Правильная конфигурация:

@Configuration
public class RabbitMQConfig {
    // Основная очередь
    public static final String USER_QUEUE = "user-events";
    public static final String USER_EXCHANGE = "user-events-exchange";
    public static final String USER_ROUTING_KEY = "user.*";
    
    // Dead Letter Queue (для неудачных сообщений)
    public static final String DLQ_QUEUE = "user-events-dlq";
    public static final String DLQ_EXCHANGE = "user-events-dlq-exchange";
    
    // Основная очередь с параметрами retry
    @Bean
    public Queue userQueue() {
        return QueueBuilder.durable(USER_QUEUE)
            .deadLetterExchange(DLQ_EXCHANGE)  // Куда идут неудачные сообщения
            .deadLetterRoutingKey("user.dlq")
            .build();
    }
    
    // Exchange для основной очереди
    @Bean
    public TopicExchange userExchange() {
        return new TopicExchange(USER_EXCHANGE, true, false);
    }
    
    // Binding
    @Bean
    public Binding userBinding() {
        return BindingBuilder.bind(userQueue())
            .to(userExchange())
            .with(USER_ROUTING_KEY);
    }
    
    // Dead Letter Queue
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_QUEUE)
            .build();
    }
    
    // Dead Letter Exchange
    @Bean
    public TopicExchange deadLetterExchange() {
        return new TopicExchange(DLQ_EXCHANGE, true, false);
    }
    
    // DLQ Binding
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with("user.dlq");
    }
}

Обработчик с retry логикой:

@Service
public class UserEventHandler {
    private final UserService userService;
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @RabbitListener(queues = "user-events")
    public void handleUserEvent(UserEvent event, Message message) {
        try {
            userService.processUser(event);
            logger.info("Successfully processed user event: {}", event.getId());
        } catch (UserProcessingException e) {
            // Получить количество retry попыток
            Integer retryCount = (Integer) message.getMessageProperties()
                .getHeader("x-death") == null ? 0 : 
                ((List<Map<?, ?>>) message.getMessageProperties()
                    .getHeader("x-death")).get(0).get("count");
            
            if (retryCount < 3) {
                // Повторный retry — отправить обратно в очередь
                logger.warn("Retry attempt {} for event: {}", retryCount + 1, event.getId());
                throw new AmqpRejectAndDontRequeueException(e);
            } else {
                // Максимальное количество retry — отправить в DLQ
                logger.error("Max retries exceeded for event: {}, sending to DLQ", event.getId());
                throw new AmqpRejectAndDontRequeueException(e);
            }
        }
    }
    
    // Обработчик для Dead Letter Queue
    @RabbitListener(queues = "user-events-dlq")
    public void handleDlqEvent(UserEvent event) {
        logger.error("Processing DLQ message: {}", event.getId());
        // Отправить в Slack/Email алерт для администраторов
        // Или сохранить в database для дальнейшего анализа
    }
}

Решение 2: Retry с Delay

Использование TTL (Time To Live) для delay:

@Configuration
public class RabbitMQRetryConfig {
    // Основная очередь
    public static final String MAIN_QUEUE = "user-events";
    
    // Retry очередь с delay
    public static final String RETRY_QUEUE = "user-events-retry";
    public static final String RETRY_EXCHANGE = "user-events-retry-exchange";
    
    // Основная очередь
    @Bean
    public Queue mainQueue() {
        return QueueBuilder.durable(MAIN_QUEUE)
            .deadLetterExchange(RETRY_EXCHANGE)
            .build();
    }
    
    // Retry очередь с TTL (время жизни сообщения)
    @Bean
    public Queue retryQueue() {
        return QueueBuilder.durable(RETRY_QUEUE)
            .ttl(5000)  // 5 секунд delay перед retry
            .deadLetterExchange("user-events-exchange")
            .deadLetterRoutingKey("user.*")
            .build();
    }
    
    @Bean
    public TopicExchange retryExchange() {
        return new TopicExchange(RETRY_EXCHANGE, true, false);
    }
    
    @Bean
    public Binding retryBinding() {
        return BindingBuilder.bind(retryQueue())
            .to(retryExchange())
            .with("user.*");
    }
}

Обработчик с retry delay:

@Service
public class UserEventHandlerWithDelay {
    private final UserService userService;
    private final RabbitTemplate rabbitTemplate;
    
    @RabbitListener(queues = "user-events")
    public void handleUserEvent(UserEvent event, Message message) {
        try {
            userService.processUser(event);
        } catch (TemporaryException e) {
            // Временная ошибка (например, сервис недоступен)
            // Отправить в retry очередь с delay
            Integer retryCount = getRetryCount(message);
            
            if (retryCount < 3) {
                // Пересправить через 5 секунд
                rabbitTemplate.convertAndSend("user-events-retry-exchange", 
                    "user.retry", event);
                logger.warn("Sending to retry queue, attempt {}", retryCount + 1);
            } else {
                // Отправить в DLQ после 3 попыток
                logger.error("Max retries exceeded");
                throw new AmqpRejectAndDontRequeueException(e);
            }
        }
    }
    
    private Integer getRetryCount(Message message) {
        Object xDeathHeader = message.getMessageProperties().getHeader("x-death");
        if (xDeathHeader == null) return 0;
        
        List<Map<?, ?>> deaths = (List<Map<?, ?>>) xDeathHeader;
        return deaths.isEmpty() ? 0 : (Integer) deaths.get(0).get("count");
    }
}

Сценарий 2: Consumer Crash (Потребитель упал)

Если consumer неожиданно завершился:

@Service
public class UserEventHandler {
    @RabbitListener(queues = "user-events")
    public void handleUserEvent(UserEvent event) {
        logger.info("Started processing event: {}", event.getId());
        userService.processUser(event);
        logger.info("Completed processing event: {}", event.getId());
        // Если процесс убьётся ДО этой строки, сообщение вернётся в очередь
    }
}

Поведение:

  1. RabbitMQ ожидает acknowledgment от consumer
  2. Если consumer не отправил ack — сообщение остаётся в очереди
  3. Когда новый consumer подключится — получит это сообщение

Конфигурация:

@Configuration
public class RabbitAckConfig {
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        // Использовать MANUAL acknowledgment
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        return factory;
    }
}

@Service
public class ManualAckHandler {
    @RabbitListener(queues = "user-events")
    public void handleUserEvent(UserEvent event, Channel channel, 
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            userService.processUser(event);
            // Отправить positive acknowledgment
            channel.basicAck(deliveryTag, false);
            logger.info("Successfully acknowledged event: {}", event.getId());
        } catch (Exception e) {
            try {
                // Отправить negative acknowledgment и перевернуть в очередь
                channel.basicNack(deliveryTag, false, true);
                logger.error("Failed to process, requeuing event", e);
            } catch (IOException ioe) {
                logger.error("Failed to nack message", ioe);
            }
        }
    }
}

Best Practices

1. Всегда использовать DLQ:

// Конфигурация с DLQ — всегда
@Bean
public Queue queueWithDlq() {
    return QueueBuilder.durable("main-queue")
        .deadLetterExchange("dlq-exchange")
        .deadLetterRoutingKey("dlq-key")
        .build();
}

2. Установить лимит retry:

// Не делать бесконечные retry
if (retryCount >= MAX_RETRIES) {
    // Отправить в DLQ
    sendToDlq(message);
}

3. Логирование и мониторинг:

private void handleFailure(UserEvent event, Exception e) {
    logger.error("Event processing failed: id={}, reason={}", 
        event.getId(), e.getMessage());
    
    // Отправить метрику
    meterRegistry.counter("rabbit.message.failure").increment();
    
    // Отправить алерт если критичный
    if (isCritical(e)) {
        alertingService.sendAlert("RabbitMQ processing failure");
    }
}

Заключение

Если сообщение не доставлено в RabbitMQ:

  1. Без конфигурации — бесконечный retry (плохо)
  2. С DLQ — отправка в Dead Letter Queue после N попыток (хорошо)
  3. С delay — retry с задержкой для временных ошибок (отлично)
  4. С manual ack — полный контроль над потоком обработки

Правильная конфигурация RabbitMQ гарантирует:

  • Надёжную доставку сообщений
  • Обработку ошибок без потери данных
  • Видимость проблем в DLQ
  • Graceful degradation при сбоях
Что будет в RabbitMQ если сообщение не доставлено? | PrepBro