← Назад к вопросам

Какие механизмы используются под капотом параллельного Stream

2.0 Middle🔥 251 комментариев
#Docker, Kubernetes и DevOps#Stream API и функциональное программирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Механизмы параллельного Stream в Java

Параллельные Stream используют мощный механизм под названием Fork/Join Framework для распределения работы между несколькими потоками. Давайте разберём, как это работает под капотом.

Fork/Join Framework

Fork/Join Framework — это основа параллельных Stream. Она основана на принципе "divide and conquer" (разделяй и властвуй).

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
int sum = numbers
    .parallelStream()
    .mapToInt(Integer::intValue)
    .sum();
// Под капотом используется ForkJoinPool

ForkJoinPool

ForkJoinPool — это специализированный пул потоков, оптимизированный для Fork/Join задач.

// Глобальный пул потоков (используется параллельными Stream)
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println("Threads: " + commonPool.getParallelism());
// Обычно = число процессоров - 1

Количество потоков по умолчанию:

  • На 4-ядерном процессоре: 3 потока
  • На 8-ядерном процессоре: 7 потоков
  • Формула: Runtime.getRuntime().availableProcessors() - 1

Можно изменить через system property:

java -Djava.util.concurrent.ForkJoinPool.common.parallelism=16 Application

Процесс выполнения параллельного Stream

Шаг 1: Splitting (разделение)

Исходный поток данных разделяется на подзадачи:

List<Integer> list = Arrays.asList(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
);
int result = list
    .parallelStream()
    .filter(x -> x > 5)
    .map(x -> x * 2)
    .sum();

// Разделение:
// Task 1: [1, 2, 3, 4, 5, 6]
// Task 2: [7, 8, 9, 10, 11, 12]
// Или ещё меньше, зависит от сплиттера

Шаг 2: Fork (ветвление)

Каждая подзадача отправляется в отдельный поток:

// Внутри это выглядит как:
ForkJoinTask<Integer> task1 = ForkJoinTask.adapt(
    () -> process(sublist1)
).fork();
ForkJoinTask<Integer> task2 = ForkJoinTask.adapt(
    () -> process(sublist2)
).fork();

Шаг 3: Join (объединение)

Потоки ждут результаты и объединяют их:

int result1 = task1.join();
int result2 = task2.join();
int finalResult = combine(result1, result2);

Spliterator

Spliterator отвечает за разделение данных на подзадачи.

// Каждая коллекция имеет свой Spliterator
Spliterator<Integer> spliterator = list.spliterator();

// Попытка разделить на две части
Spliterator<Integer> other = spliterator.trySplit();

if (other != null) {
    // Успешно разделилось на две части
    System.out.println("Split successful");
}

Визуализация разделения:

[1, 2, 3, 4, 5, 6, 7, 8]
         |-- trySplit()
    /            \
[1, 2, 3, 4]   [5, 6, 7, 8]
  |-- trySplit()
 /        \
[1, 2]  [3, 4]   [5, 6]  [7, 8]
|-- trySplit()
/ \
[1] [2]

Work Stealing

Work Stealing — умный механизм балансировки нагрузки между потоками.

Каждый поток имеет собственную очередь задач (deque):

Поток 1: [Task A, Task B, Task C] <- добавляет в конец
         ^ забирает из конца

Поток 2: [Task D, Task E] <- добавляет в конец
         ^ забирает из конца

Если Поток 2 финишил раньше:
  - Он может "украсть" Task C из конца очереди Потока 1
  - Это балансирует нагрузку

Преимущества Work Stealing:

  • Минимальная синхронизация (кража только с конца)
  • Хорошая балансировка работы
  • Отличная кэширование локальности (cache locality)

Пример реализации

abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    protected abstract V compute();
}

class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 1000;
    private int[] array;
    private int start, end;
    
    @Override
    protected Long compute() {
        // Если задача маленькая - считаем напрямую
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }
        
        // Разделяем задачу
        int mid = (start + end) / 2;
        SumTask leftTask = new SumTask(array, start, mid);
        SumTask rightTask = new SumTask(array, mid, end);
        
        // Fork - запускаем асинхронно
        leftTask.fork();
        rightTask.fork();
        
        // Join - ждём результатов
        Long leftResult = leftTask.join();
        Long rightResult = rightTask.join();
        
        return leftResult + rightResult;
    }
}

// Использование
int[] array = new int[100_000];
SumTask task = new SumTask(array, 0, array.length);
Long result = ForkJoinPool.commonPool().invoke(task);

Когда использовать параллельные Stream

✅ Используй параллельные Stream когда:

// 1. Большие наборы данных (тысячи+ элементов)
List<Integer> hugeList = IntStream.range(0, 1_000_000)
    .boxed()
    .collect(toList());
int sum = hugeList.parallelStream().mapToInt(Integer::intValue).sum();

// 2. Тяжёлые операции (много CPU)
List<String> documents = getDocuments();
List<String> processed = documents
    .parallelStream()
    .map(this::complexProcessing) // CPU-intensive
    .collect(toList());

// 3. Работа на многоядерных системах

❌ НЕ используй параллельные Stream когда:

// 1. Маленькие наборы (< 1000 элементов)
List<Integer> small = Arrays.asList(1, 2, 3);
int sum = small.parallelStream().sum(); // OVERHEAD > BENEFIT

// 2. Операции с I/O (network, файлы)
List<String> urls = getUrls();
List<String> results = urls.parallelStream()
    .map(this::httpRequest) // ❌ I/O-bound, не CPU-bound
    .collect(toList());

// 3. Изменяемые shared state
int[] counter = {0};
list.parallelStream().forEach(x -> counter[0]++); // ❌ Race condition!

// 4. Упорядоченные операции важны
list.parallelStream().forEach(System.out::println); // ❌ Порядок не гарантирован

Производительность

// Бенчмарк параллельного vs последовательного
List<Integer> list = IntStream.range(1, 100_000_000)
    .boxed()
    .collect(toList());

// Последовательный
long start = System.nanoTime();
int result1 = list.stream().mapToInt(Integer::intValue).sum();
long sequential = System.nanoTime() - start;

// Параллельный
start = System.nanoTime();
int result2 = list.parallelStream().mapToInt(Integer::intValue).sum();
long parallel = System.nanoTime() - start;

System.out.println("Sequential: " + sequential);
System.out.println("Parallel: " + parallel);
System.out.println("Speedup: " + (sequential / (double) parallel));
// Обычно параллельный в 2-4 раза быстрее на многоядерных системах

Конфигурация ForkJoinPool

// Пользовательский ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(8);

// Использование с Stream
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Integer result = customPool.invoke(
    list.stream()
        .map(x -> x * 2)
        .collect(toCollection(ArrayList::new))
);

Параллельные Stream мощный инструмент, но требуют понимания внутренних механизмов для эффективного использования.

Какие механизмы используются под капотом параллельного Stream | PrepBro