← Назад к вопросам
Какой механизм используется под капотом у параллельного стрима?
2.0 Middle🔥 131 комментариев
#Stream API и функциональное программирование#Многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Механизм параллельных стримов: ForkJoinPool
Основа: ForkJoinPool
Параллельные стримы используют ForkJoinPool — специальный пул потоков, оптимизированный для разделения работы и работы кражи (work-stealing).
// Когда вы пишите:
List<Integer> data = generateMillionIntegers();
long result = data.parallelStream()
.map(n -> n * 2)
.filter(n -> n > 100)
.reduce(0, Integer::sum);
// ПОД КАПОТОМ:
ForkJoinPool.commonPool().execute(() -> {
// Работа распределяется между потоками
// Используется ForkJoinTask
// Потоки крадут работу друг у друга (work-stealing)
});
Общий пул (Common ForkJoinPool)
Инициализация
public class ForkJoinPoolExample {
public static void main(String[] args) {
// Получить общий пул
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println("Parallelism level: " +
commonPool.getParallelism()); // Обычно = CPU cores - 1
// На 4-ядерном: 3
// На 8-ядерном: 7
// На 16-ядерном: 15
}
}
Конфигурация через JVM параметры
# Установить размер пула
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=8 MyApp
# Отключить асинхронный режим
java -Djava.util.concurrent.ForkJoinPool.common.asyncMode=true MyApp
ForkJoinTask
Иерархия задач
ForkJoinTask
├── RecursiveAction
│ ├── SumAction
│ ├── FilterAction
│ └── MapAction
└── RecursiveTask<R>
├── SumTask
├── CountTask
└── ...
Как работает RecursiveTask
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000; // Порог разделения
private int[] array;
private int start, end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// Если работа маленькая — выполни напрямую
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// Разделить (Fork)
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// Отправить левую задачу в пул
leftTask.fork();
// Вычислить правую задачу в этом потоке
long rightResult = rightTask.compute();
// Получить результат левой задачи (Join)
long leftResult = leftTask.join();
return leftResult + rightResult;
}
public static void main(String[] args) {
int[] array = new int[100_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task);
System.out.println("Sum: " + result);
}
}
Work-stealing алгоритм
Проблема без work-stealing
Поток 1: Много работы (30 задач)
Поток 2: Мало работы (2 задачи)
Поток 3: Никакой работы (свободен)
Поток 4: Очень много работы (50 задач)
Результат: неравномерное распределение
Work-stealing решение
Каждый поток имеет двусторонний очередь (deque):
Поток 1: [T1] [T2] [T3] ← добавляет в конец
Поток 2: [T10] [T11] [T12] [T13] [T14]
Поток 3: (пусто, свободен)
Поток 4: [T30] [T31] ... [T60] ← много задач
Когда Поток 1 свободен:
- Он берёт задачу из начала своей очереди (свою)
- Если своя пуста — крадёт из конца чужой очереди (из Потока 4)
Когда Поток 3 свободен:
- Крадёт из Потока 4, затем из Потока 1
Результат: хорошее распределение!
Реализация work-stealing
public class WorkStealingExample {
private static class Task implements Runnable {
private int id;
private long duration;
public Task(int id, long duration) {
this.id = id;
this.duration = duration;
}
@Override
public void run() {
// Имитируем работу
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < duration) {
// Spin
}
System.out.println("Task " + id + " completed on " +
Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
// Work-stealing пул распределит задачи
ForkJoinPool pool = new ForkJoinPool(4); // 4 потока
// Задачи с разной длительностью
List<Task> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(new Task(i, 100 + i * 50)); // Разная длительность
}
// ForkJoinPool автоматически балансирует нагрузку
for (Task task : tasks) {
pool.execute(task);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
}
Как параллельный стрим использует ForkJoinPool
Схема обработки
// Когда вы пишете:
list.parallelStream()
.filter(n -> n > 10)
.map(n -> n * 2)
.reduce(0, Integer::sum);
// Происходит следующее:
// 1. Создаётся StreamShape, определяется граф операций
stream = list.parallelStream()
.filter(n -> n > 10) // Операция 1
.map(n -> n * 2) // Операция 2
.reduce(0, Integer::sum); // Операция 3 (терминальная)
// 2. Стрим разбивается на куски (splits)
Spliterator spliterator = list.spliterator();
// Если 1000 элементов → может разбить на 4 части по 250
// 3. Для каждого куска создаётся ForkJoinTask
Task chunk1 = new FilterMapReduceTask(list, 0, 250);
Task chunk2 = new FilterMapReduceTask(list, 250, 500);
Task chunk3 = new FilterMapReduceTask(list, 500, 750);
Task chunk4 = new FilterMapReduceTask(list, 750, 1000);
// 4. Задачи выполняются параллельно в ForkJoinPool
ForkJoinPool.commonPool().invoke(combinedTask);
// 5. Результаты комбинируются
result = chunk1.join() + chunk2.join() + chunk3.join() + chunk4.join();
Внутренняя реализация
Spliterator для параллельного разбиения
public class SpliteratorExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
// Получить spliterator
Spliterator<Integer> spliterator = numbers.spliterator();
System.out.println("Characteristics: " +
spliterator.characteristics());
// Может включать: ORDERED, SIZED, SUBSIZED, etc.
// Попытка разбить
Spliterator<Integer> rightSplit = spliterator.trySplit();
// Оригинал получит элементы 0-3
// rightSplit получит элементы 4-7
// Это позволяет параллельному стриму
// разбить работу рекурсивно
}
}
// Пример Spliterator для массива:
public class ArraySpliterator {
public static void main(String[] args) {
int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Spliterator.OfInt spliterator =
Arrays.spliterator(array);
// Левая часть: [1, 2, 3, 4, 5]
Spliterator.OfInt left = spliterator;
// Правая часть: [6, 7, 8, 9, 10]
Spliterator.OfInt right = spliterator.trySplit();
// Каждая часть обрабатывается отдельно
}
}
Поток выполнения на практике
public class DetailedExecutionFlow {
public static void main(String[] args) {
List<Integer> data = new ArrayList<>();
for (int i = 0; i < 100; i++) {
data.add(i);
}
// Параллельная обработка
int result = data.parallelStream()
.peek(n -> System.out.println(
"Processing " + n + " on " +
Thread.currentThread().getName()))
.filter(n -> n % 2 == 0) // Только чётные
.map(n -> n * 2) // Умножить на 2
.reduce(0, Integer::sum); // Сумма
System.out.println("Result: " + result);
}
}
// Вывод (примерный):
// Processing 0 on ForkJoinPool.commonPool-worker-1
// Processing 1 on ForkJoinPool.commonPool-worker-2
// Processing 2 on ForkJoinPool.commonPool-worker-3
// Processing 3 on ForkJoinPool.commonPool-worker-4
// Processing 4 on ForkJoinPool.commonPool-worker-1 // Поток 1 снова работает
// ...
Создание собственного ForkJoinPool
public class CustomForkJoinPool {
public static void main(String[] args) {
// Общий пул может быть узким местом в многопоточном коде
// Создайте свой пул для важных операций
ForkJoinPool customPool = new ForkJoinPool(8); // 8 потоков
List<Integer> data = new ArrayList<>();
for (int i = 0; i < 10_000_000; i++) {
data.add(i);
}
// Использовать свой пул
long result = customPool.invoke(
new SumTask(data, 0, data.size())
);
System.out.println("Result: " + result);
customPool.shutdown();
}
static class SumTask extends RecursiveTask<Long> {
private List<Integer> data;
private int start, end;
private static final int THRESHOLD = 1000;
SumTask(List<Integer> data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += data.get(i);
}
return sum;
}
int mid = (start + end) / 2;
SumTask left = new SumTask(data, start, mid);
SumTask right = new SumTask(data, mid, end);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
Диаграмма процесса
Исходный стрим:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Шаг 1: Создать Spliterator
Spliterator.trySplit() рекурсивно разбивает:
[1..16]
/ \
[1..8] [9..16]
/ \ / \
[1..4] [5..8][9..12][13..16]
Шаг 2: Создать ForkJoinTask для каждого куска
Шаг 3: Выполнить в ForkJoinPool
Поток 1: [1..4]
Поток 2: [5..8]
Поток 3: [9..12]
Поток 4: [13..16]
Шаг 4: Комбинировать результаты
Поток 1: combine([1..4], [5..8])
Поток 2: combine([9..12], [13..16])
Поток 1: combine(результат из 1, результат из 2)
Финальный результат
Резюме механизма
- Spliterator — рекурсивно разбивает коллекцию на части
- ForkJoinTask — создаётся для каждой части
- ForkJoinPool.commonPool() — выполняет задачи в параллель
- Work-stealing — балансирует нагрузку между потоками
- Результаты комбинируются — в правильном порядке
Это позволяет параллельным стримам автоматически распараллеливать обработку данных без явного управления потоками.