← Назад к вопросам

Какой механизм используется под капотом у параллельного стрима?

2.0 Middle🔥 131 комментариев
#Stream API и функциональное программирование#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Механизм параллельных стримов: ForkJoinPool

Основа: ForkJoinPool

Параллельные стримы используют ForkJoinPool — специальный пул потоков, оптимизированный для разделения работы и работы кражи (work-stealing).

// Когда вы пишите:
List<Integer> data = generateMillionIntegers();
long result = data.parallelStream()
    .map(n -> n * 2)
    .filter(n -> n > 100)
    .reduce(0, Integer::sum);

// ПОД КАПОТОМ:
ForkJoinPool.commonPool().execute(() -> {
    // Работа распределяется между потоками
    // Используется ForkJoinTask
    // Потоки крадут работу друг у друга (work-stealing)
});

Общий пул (Common ForkJoinPool)

Инициализация

public class ForkJoinPoolExample {
    public static void main(String[] args) {
        // Получить общий пул
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        
        System.out.println("Parallelism level: " + 
            commonPool.getParallelism());  // Обычно = CPU cores - 1
        // На 4-ядерном: 3
        // На 8-ядерном: 7
        // На 16-ядерном: 15
    }
}

Конфигурация через JVM параметры

# Установить размер пула
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=8 MyApp

# Отключить асинхронный режим
java -Djava.util.concurrent.ForkJoinPool.common.asyncMode=true MyApp

ForkJoinTask

Иерархия задач

ForkJoinTask
├── RecursiveAction
│   ├── SumAction
│   ├── FilterAction
│   └── MapAction
└── RecursiveTask<R>
    ├── SumTask
    ├── CountTask
    └── ...

Как работает RecursiveTask

public class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;  // Порог разделения
    private int[] array;
    private int start, end;
    
    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Long compute() {
        // Если работа маленькая — выполни напрямую
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }
        
        // Разделить (Fork)
        int mid = (start + end) / 2;
        SumTask leftTask = new SumTask(array, start, mid);
        SumTask rightTask = new SumTask(array, mid, end);
        
        // Отправить левую задачу в пул
        leftTask.fork();
        
        // Вычислить правую задачу в этом потоке
        long rightResult = rightTask.compute();
        
        // Получить результат левой задачи (Join)
        long leftResult = leftTask.join();
        
        return leftResult + rightResult;
    }
    
    public static void main(String[] args) {
        int[] array = new int[100_000_000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i;
        }
        
        ForkJoinPool pool = ForkJoinPool.commonPool();
        SumTask task = new SumTask(array, 0, array.length);
        Long result = pool.invoke(task);
        System.out.println("Sum: " + result);
    }
}

Work-stealing алгоритм

Проблема без work-stealing

Поток 1: Много работы (30 задач)
Поток 2: Мало работы (2 задачи)
Поток 3: Никакой работы (свободен)
Поток 4: Очень много работы (50 задач)

Результат: неравномерное распределение

Work-stealing решение

Каждый поток имеет двусторонний очередь (deque):

Поток 1: [T1] [T2] [T3] ← добавляет в конец
Поток 2: [T10] [T11] [T12] [T13] [T14]
Поток 3: (пусто, свободен)
Поток 4: [T30] [T31] ... [T60] ← много задач

Когда Поток 1 свободен:
- Он берёт задачу из начала своей очереди (свою)
- Если своя пуста — крадёт из конца чужой очереди (из Потока 4)

Когда Поток 3 свободен:
- Крадёт из Потока 4, затем из Потока 1

Результат: хорошее распределение!

Реализация work-stealing

public class WorkStealingExample {
    private static class Task implements Runnable {
        private int id;
        private long duration;
        
        public Task(int id, long duration) {
            this.id = id;
            this.duration = duration;
        }
        
        @Override
        public void run() {
            // Имитируем работу
            long start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start < duration) {
                // Spin
            }
            System.out.println("Task " + id + " completed on " + 
                Thread.currentThread().getName());
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // Work-stealing пул распределит задачи
        ForkJoinPool pool = new ForkJoinPool(4);  // 4 потока
        
        // Задачи с разной длительностью
        List<Task> tasks = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            tasks.add(new Task(i, 100 + i * 50));  // Разная длительность
        }
        
        // ForkJoinPool автоматически балансирует нагрузку
        for (Task task : tasks) {
            pool.execute(task);
        }
        
        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.MINUTES);
    }
}

Как параллельный стрим использует ForkJoinPool

Схема обработки

// Когда вы пишете:
list.parallelStream()
    .filter(n -> n > 10)
    .map(n -> n * 2)
    .reduce(0, Integer::sum);

// Происходит следующее:

// 1. Создаётся StreamShape, определяется граф операций
stream = list.parallelStream()
    .filter(n -> n > 10)     // Операция 1
    .map(n -> n * 2)         // Операция 2
    .reduce(0, Integer::sum); // Операция 3 (терминальная)

// 2. Стрим разбивается на куски (splits)
Spliterator spliterator = list.spliterator();
// Если 1000 элементов → может разбить на 4 части по 250

// 3. Для каждого куска создаётся ForkJoinTask
Task chunk1 = new FilterMapReduceTask(list, 0, 250);
Task chunk2 = new FilterMapReduceTask(list, 250, 500);
Task chunk3 = new FilterMapReduceTask(list, 500, 750);
Task chunk4 = new FilterMapReduceTask(list, 750, 1000);

// 4. Задачи выполняются параллельно в ForkJoinPool
ForkJoinPool.commonPool().invoke(combinedTask);

// 5. Результаты комбинируются
result = chunk1.join() + chunk2.join() + chunk3.join() + chunk4.join();

Внутренняя реализация

Spliterator для параллельного разбиения

public class SpliteratorExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        
        // Получить spliterator
        Spliterator<Integer> spliterator = numbers.spliterator();
        
        System.out.println("Characteristics: " + 
            spliterator.characteristics());
        // Может включать: ORDERED, SIZED, SUBSIZED, etc.
        
        // Попытка разбить
        Spliterator<Integer> rightSplit = spliterator.trySplit();
        // Оригинал получит элементы 0-3
        // rightSplit получит элементы 4-7
        
        // Это позволяет параллельному стриму
        // разбить работу рекурсивно
    }
}

// Пример Spliterator для массива:
public class ArraySpliterator {
    public static void main(String[] args) {
        int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        
        Spliterator.OfInt spliterator = 
            Arrays.spliterator(array);
        
        // Левая часть: [1, 2, 3, 4, 5]
        Spliterator.OfInt left = spliterator;
        
        // Правая часть: [6, 7, 8, 9, 10]
        Spliterator.OfInt right = spliterator.trySplit();
        
        // Каждая часть обрабатывается отдельно
    }
}

Поток выполнения на практике

public class DetailedExecutionFlow {
    public static void main(String[] args) {
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            data.add(i);
        }
        
        // Параллельная обработка
        int result = data.parallelStream()
            .peek(n -> System.out.println(
                "Processing " + n + " on " + 
                Thread.currentThread().getName()))
            .filter(n -> n % 2 == 0)  // Только чётные
            .map(n -> n * 2)          // Умножить на 2
            .reduce(0, Integer::sum);  // Сумма
        
        System.out.println("Result: " + result);
    }
}

// Вывод (примерный):
// Processing 0 on ForkJoinPool.commonPool-worker-1
// Processing 1 on ForkJoinPool.commonPool-worker-2
// Processing 2 on ForkJoinPool.commonPool-worker-3
// Processing 3 on ForkJoinPool.commonPool-worker-4
// Processing 4 on ForkJoinPool.commonPool-worker-1  // Поток 1 снова работает
// ...

Создание собственного ForkJoinPool

public class CustomForkJoinPool {
    public static void main(String[] args) {
        // Общий пул может быть узким местом в многопоточном коде
        // Создайте свой пул для важных операций
        
        ForkJoinPool customPool = new ForkJoinPool(8);  // 8 потоков
        
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 10_000_000; i++) {
            data.add(i);
        }
        
        // Использовать свой пул
        long result = customPool.invoke(
            new SumTask(data, 0, data.size())
        );
        
        System.out.println("Result: " + result);
        
        customPool.shutdown();
    }
    
    static class SumTask extends RecursiveTask<Long> {
        private List<Integer> data;
        private int start, end;
        private static final int THRESHOLD = 1000;
        
        SumTask(List<Integer> data, int start, int end) {
            this.data = data;
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += data.get(i);
                }
                return sum;
            }
            
            int mid = (start + end) / 2;
            SumTask left = new SumTask(data, start, mid);
            SumTask right = new SumTask(data, mid, end);
            
            left.fork();
            long rightResult = right.compute();
            long leftResult = left.join();
            
            return leftResult + rightResult;
        }
    }
}

Диаграмма процесса

Исходный стрим:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]

Шаг 1: Создать Spliterator
Spliterator.trySplit() рекурсивно разбивает:

        [1..16]
        /     \
    [1..8]    [9..16]
    /    \    /    \
 [1..4] [5..8][9..12][13..16]

Шаг 2: Создать ForkJoinTask для каждого куска

Шаг 3: Выполнить в ForkJoinPool

Поток 1: [1..4]
Поток 2: [5..8]
Поток 3: [9..12]
Поток 4: [13..16]

Шаг 4: Комбинировать результаты

Поток 1: combine([1..4], [5..8])
Поток 2: combine([9..12], [13..16])

Поток 1: combine(результат из 1, результат из 2)

Финальный результат

Резюме механизма

  1. Spliterator — рекурсивно разбивает коллекцию на части
  2. ForkJoinTask — создаётся для каждой части
  3. ForkJoinPool.commonPool() — выполняет задачи в параллель
  4. Work-stealing — балансирует нагрузку между потоками
  5. Результаты комбинируются — в правильном порядке

Это позволяет параллельным стримам автоматически распараллеливать обработку данных без явного управления потоками.

Какой механизм используется под капотом у параллельного стрима? | PrepBro