← Назад к вопросам

Как достигается скорость при реактивном программировании

1.7 Middle🔥 161 комментариев
#Основы Java

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ответ: Производительность в реактивном программировании

Реактивное программирование достигает высокой скорости благодаря совершенно другой модели обработки данных по сравнению с традиционной многопоточностью.

Основное отличие: потоки vs события

❌ Традиционная многопоточность (Blocking)

Поток 1: ████████████████████ (ждёт I/O) ░░░░░░░░░░ ████████
Поток 2: ████████████████████ (ждёт I/O) ░░░░░░░░░░ ████████
Поток 3: ████████████████████ (ждёт I/O) ░░░░░░░░░░ ████████

Проблема: много потоков ждут (░░░░░░░░░░), расходуя память и CPU
✅ Реактивное программирование (Non-blocking)

Поток 1: ██ (запрос) ► (события обработаны) ██ (результат)
Поток 2: ██ (запрос) ► (события обработаны) ██ (результат)
Поток 3: ██ (запрос) ► (события обработаны) ██ (результат)

Преимущество: потоки никогда не блокируются, постоянно работают

Как работает реактивное программирование

// Традиционный подход (Blocking)
public class BlockingExample {
    public static void main(String[] args) {
        // Выполняется последовательно
        String data = fetchDataFromDatabase();  // Блок на 100ms
        String transformed = transform(data);    // Блок на 50ms
        sendToClient(transformed);              // Блок на 30ms
        System.out.println("Total time: 180ms");
        
        // Если у нас 10 потоков, то для 10 запросов нужно 180ms * 10 = 1800ms
    }
    
    static String fetchDataFromDatabase() {
        Thread.sleep(100); // Блокируем поток
        return "data";
    }
    
    static String transform(String data) {
        Thread.sleep(50);
        return data.toUpperCase();
    }
    
    static void sendToClient(String data) {
        Thread.sleep(30);
    }
}

// Реактивный подход (Non-blocking)
public class ReactiveExample {
    public static void main(String[] args) throws Exception {
        // Выполняется асинхронно
        Mono<String> result = Mono
            .fromCallable(() -> fetchDataFromDatabase())  // Не блокирует!
            .delayElement(Duration.ofMillis(100))         // Имитация I/O
            .map(data -> transform(data))                 // Цепь преобразований
            .delayElement(Duration.ofMillis(50))
            .doOnNext(data -> sendToClient(data))        // Side effect
            .delayElement(Duration.ofMillis(30));
        
        // Теперь один поток может обрабатывать 10 запросов параллельно!
        result.subscribe(
            data -> System.out.println("Result: " + data),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Complete")
        );
        
        // Ждём завершения
        Thread.sleep(500);
    }
}

Ключевой механизм: Event Loop

┌─────────────────────────────────────────────────────────┐
│                    Event Loop (1 поток)                 │
├─────────────────────────────────────────────────────────┤
│                                                          │
│  1. Получить событие из очереди                         │
│  2. Обработать (быстро!)
│  3. Если нужна I/O → зарегистрировать callback
│  4. Перейти к следующему событию
│  5. Когда I/O готов → вернуть результат в очередь      │
│                                                          │
└─────────────────────────────────────────────────────────┘

Результат: 1 поток может обрабатывать 1000+ запросов одновременно!

Сравнение производительности

Традиционная многопоточность (10 потоков):
Память потока: 10 потоков × 1-2 MB = 10-20 MB
ОС переключение контекста: много
10 одновременных запросов: 180ms

┌────────────────────────┐
│ 10 × 180ms = 1800ms    │
└────────────────────────┘

Реактивное программирование (1 поток):
Память потока: 1 поток × 1-2 MB = 1-2 MB
ОС переключение контекста: минимум
10 одновременных запросов: 180ms (параллельно!)

┌────────────────────────┐
│ 180ms (18x快er)      │
└────────────────────────┘

Как Reactor достигает скорости

// 1. Non-blocking операции
public class NonBlockingIO {
    
    // ❌ Блокирует поток
    public String fetchDataBlocking() {
        // Это блокирует текущий поток на всё время запроса
        return RestTemplate
            .getForObject("http://api.example.com/data", String.class);
    }
    
    // ✅ Не блокирует
    public Mono<String> fetchDataNonBlocking() {
        return WebClient.create("http://api.example.com")
            .get()
            .uri("/data")
            .retrieve()
            .bodyToMono(String.class);
        // Поток не блокируется, ждёт callback
    }
}

// 2. Планирование на специальных потоках
public class Schedulers {
    // Reactor использует разные планировщики для разных задач:
    
    // Bounded Elastic: для I/O операций (подхватывает I/O потоки)
    // Parallel: для CPU-heavy операций
    // Immediate: выполняется сразу в текущем потоке
    // Single: выполняется в одном потоке
}

Пример: обработка 1000 запросов

@RestController
public class ReactiveController {
    
    @Autowired
    private WebClient webClient;
    
    // Традиционный подход: нужно 1000 потоков
    @GetMapping("/traditional/{id}")
    public ResponseEntity<String> getDataTraditional(@PathVariable Long id) {
        // Блокирует текущий поток на время запроса
        String data = webClient.get()
            .uri("/api/{id}", id)
            .retrieve()
            .bodyToMono(String.class)
            .block(); // ❌ Блокирует!
        return ResponseEntity.ok(data);
    }
    
    // Реактивный подход: нужно 10-50 потоков
    @GetMapping("/reactive/{id}")
    public Mono<ResponseEntity<String>> getDataReactive(@PathVariable Long id) {
        return webClient.get()
            .uri("/api/{id}", id)
            .retrieve()
            .bodyToMono(String.class)
            .map(ResponseEntity::ok);
        // ✅ Не блокирует! Один поток обрабатывает все запросы
    }
}

// При 1000 одновременных запросов:
// Traditional: нужно минимум 1000 потоков
// Reactive: нужно 10-50 потоков (зависит от CPU cores)

Механизм backpressure (давление обратно)

// Реактивное программирование может контролировать скорость

public class BackpressureExample {
    
    public static void main(String[] args) {
        Flux.range(1, 1000000)  // Миллион элементов
            .log()
            .onBackpressureBuffer(100)  // Буфер max 100 элементов
            .delayElements(Duration.ofMillis(10))  // Медленный subscriber
            .subscribe(
                item -> System.out.println("Received: " + item),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed")
            );
    }
}

// Реактивная система:
// 1. Source генерирует данные быстро
// 2. Subscriber обрабатывает медленно
// 3. Backpressure замедляет source, чтобы не переполнить память

Сравнение потребления памяти

Традиционная многопоточность (обработка 10k одновременных запросов):

┌─────────────────────────────────────────────┐
│ 10,000 потоков × 1.5 MB = 15 GB RAM ❌    │
│ Context switching overhead = высокая CPU  │
└─────────────────────────────────────────────┘

Реактивное программирование (обработка 10k одновременных запросов):

┌─────────────────────────────────────────────┐
│ 8 потоков × 1.5 MB = 12 MB RAM ✅         │
│ Context switching overhead = минимальная    │
└─────────────────────────────────────────────┘

Spring WebFlux vs Spring MVC

// Spring MVC (Blocking)
@RestController
@RequestMapping("/api/mvc")
public class MvcController {
    
    @GetMapping("/users/{id}")
    public ResponseEntity<User> getUser(@PathVariable Long id) {
        // Каждый запрос = один поток (Tomcat thread pool)
        User user = userService.getUserBlocking(id);  // Блокирует
        return ResponseEntity.ok(user);
    }
}

// При 1000 одновременных запросов:
// - Нужно 1000 потоков (или очередь ждёт)
// - ~1500 MB памяти
// - Высокая CPU для переключения контекста

// Spring WebFlux (Non-blocking, Reactive)
@RestController
@RequestMapping("/api/webflux")
public class WebFluxController {
    
    @GetMapping("/users/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable Long id) {
        // Один поток может обрабатывать много запросов
        return userService.getUserReactive(id)  // Не блокирует
            .map(ResponseEntity::ok);
    }
}

// При 1000 одновременных запросов:
// - 8 потоков (по количеству CPU cores)
// - ~20 MB памяти
// - Минимальная CPU для переключения
// - 10-100x выше пропускная способность!

Практический пример: микросервис

@Service
public class UserService {
    
    @Autowired
    private WebClient webClient;
    
    // Получение данных от 3 сервисов параллельно (не последовательно!)
    public Mono<UserProfile> getUserProfile(Long userId) {
        Mono<User> userMono = getUserData(userId);
        Mono<List<Order>> ordersMono = getOrders(userId);
        Mono<Preferences> prefsMono = getPreferences(userId);
        
        // Все 3 запроса выполняются параллельно
        return Mono.zip(userMono, ordersMono, prefsMono)
            .map(tuple -> {
                User user = tuple.getT1();
                List<Order> orders = tuple.getT2();
                Preferences prefs = tuple.getT3();
                return new UserProfile(user, orders, prefs);
            });
    }
    
    private Mono<User> getUserData(Long userId) {
        return webClient.get()
            .uri("/users/{id}", userId)
            .retrieve()
            .bodyToMono(User.class);
    }
    
    private Mono<List<Order>> getOrders(Long userId) {
        return webClient.get()
            .uri("/users/{id}/orders", userId)
            .retrieve()
            .bodyToFlux(Order.class)
            .collectList();
    }
    
    private Mono<Preferences> getPreferences(Long userId) {
        return webClient.get()
            .uri("/users/{id}/preferences", userId)
            .retrieve()
            .bodyToMono(Preferences.class);
    }
}

// При 1000 одновременных запросах:
// Traditional (3 последовательных HTTP запроса):
// 1000 потоков × (100ms + 50ms + 75ms) = 225 × 1000 = 225 секунд
//
// Reactive (3 параллельных HTTP запроса):
// 8 потоков × max(100ms, 50ms, 75ms) = 100ms для всех 1000 запросов
// = 2250x FASTER!

Основные источники скорости

1. Non-blocking I/O
   - Поток не ждёт I/O, вместо этого регистрирует callback
   - Один поток может обрабатывать много запросов параллельно

2. Правильное планирование потоков
   - Разные потоки для I/O и CPU операций
   - Минимальное переключение контекста

3. Параллелизм без потоков
   - Интероператорское переплетение (coroutines-like)
   - Обработка множества операций в одном потоке

4. Backpressure
   - Система не переполняется данными
   - Контролируется память и CPU

5. Интеграция с асинхронными библиотеками
   - WebClient вместо RestTemplate
   - R2DBC (Reactive Database Connectivity) вместо JDBC

Заключение

Реактивное программирование достигает высокой скорости благодаря:

  1. Non-blocking I/O — один поток не ждёт, пока заканчивается I/O
  2. Event Loop — один поток обрабатывает множество запросов
  3. Минимальное потребление памяти — 8 потоков вместо 1000
  4. Параллелизм — используется всё при помощи Mono/Flux
  5. Backpressure — система не переполняется

Результат: WebFlux может обработать в 10-100x больше одновременных запросов, чем Spring MVC, при том же количестве серверов и памяти.

Как достигается скорость при реактивном программировании | PrepBro