← Назад к вопросам

Что находится под капотом Parallel Stream

2.0 Middle🔥 151 комментариев
#Stream API и функциональное программирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Parallel Stream в Java

Parallel streams — это мощный инструмент, но его нужно понимать. Много разработчиков используют его без понимания того, что происходит под капотом, что приводит к проблемам.

Основа: ForkJoinPool

Под капотом parallel stream используют ForkJoinPool — пул потоков, специально разработанный для work-stealing алгоритма.

// По умолчанию используется общий (shared) ForkJoinPool
ForkJoinPool pool = ForkJoinPool.commonPool();
System.out.println(pool.getParallelism()); // Количество потоков = кол-во CPU

// Обычное использование parallel stream
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
int sum = numbers.parallelStream()
    .mapToInt(Integer::intValue)
    .sum();

ForkJoin модель

ForkJoinPool использует "divide-and-conquer" стратегию:

  1. Fork — разделяю задачу на подзадачи
  2. Join — жду результатов подзадач
  3. Combine — объединяю результаты
// Пример с ForkJoinTask
public class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 1000;
    private int[] array;
    private int start;
    private int end;
    
    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Long compute() {
        // Если диапазон маленький, вычисляю напрямую
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }
        
        // Иначе разделяю на две задачи
        int mid = start + (end - start) / 2;
        SumTask leftTask = new SumTask(array, start, mid);
        SumTask rightTask = new SumTask(array, mid, end);
        
        // Fork - запускаю асинхронно
        leftTask.fork();
        
        // Рекурсивно вычисляю вторую половину
        long rightResult = rightTask.compute();
        
        // Join - жду результат первой половины
        long leftResult = leftTask.join();
        
        return leftResult + rightResult;
    }
}

// Использование
int[] array = new int[10_000_000];
SumTask task = new SumTask(array, 0, array.length);
long sum = ForkJoinPool.commonPool().invoke(task);

Work-Stealing алгоритм

Это самое интересное в ForkJoinPool. Каждый поток имеет собственную очередь задач (deque):

// Сценарий:
// Thread 1 (CPU 1): [Task A1, Task A2, Task A3] - очень занят
// Thread 2 (CPU 2): [] - ничего не делает
// Thread 3 (CPU 3): [Task C1, Task C2] - занят
// Thread 4 (CPU 4): [] - ничего не делает

// Work-stealing: Thread 2 и Thread 4 "крадут" задачи из конца очереди других потоков
// Thread 1 (CPU 1): [Task A1, Task A2] <- стащили A3
// Thread 2 (CPU 2): [Task A3] <- украл из Thread 1
// Thread 3 (CPU 3): [Task C1] <- стащили C2
// Thread 4 (CPU 4): [Task C2] <- украл из Thread 3

// Результат: все потоки работают, нет idle time

Как работает parallel stream

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

int result = numbers.parallelStream()
    .filter(n -> n > 2)              // Intermediate operation
    .map(n -> n * 2)                 // Intermediate operation
    .reduce(0, Integer::sum);        // Terminal operation

// Под капотом:
// 1. Разделяет список на несколько подсписков (количество = количество потоков)
// 2. Создаёт ForkJoinTask для каждого подсписка
// 3. Каждый поток обрабатывает свой подсписок параллельно
// 4. Результаты объединяются в конце

Практический пример с spliterator

public class ParallelStreamExample {
    
    public static void main(String[] args) {
        List<Integer> numbers = new ArrayList<>();
        for (int i = 1; i <= 1_000_000; i++) {
            numbers.add(i);
        }
        
        // Sequential - выполняется в одном потоке
        long start = System.currentTimeMillis();
        int sequentialSum = numbers.stream()
            .filter(n -> n % 2 == 0)
            .map(n -> n * 2)
            .reduce(0, Integer::sum);
        long sequentialTime = System.currentTimeMillis() - start;
        
        // Parallel - выполняется в нескольких потоках
        start = System.currentTimeMillis();
        int parallelSum = numbers.parallelStream()
            .filter(n -> n % 2 == 0)
            .map(n -> n * 2)
            .reduce(0, Integer::sum);
        long parallelTime = System.currentTimeMillis() - start;
        
        System.out.println("Sequential: " + sequentialTime + "ms");
        System.out.println("Parallel: " + parallelTime + "ms");
    }
}

Когда parallel stream имеет смысл

Плюсы:

  • Огромные коллекции (100K+ элементов)
  • Дорогие операции (например, вызовы API)
  • CPU-bound задачи

Минусы:

  • Overhead создания потоков может быть больше выигрыша
  • Shared ForkJoinPool может быть перегружен
  • Не потокобезопасные операции приведут к ошибкам
// Плохой пример - не имеет смысла параллелизм
int sum = Arrays.asList(1, 2, 3, 4, 5)
    .parallelStream()
    .reduce(0, Integer::sum); // Overhead > выигрыш

// Хороший пример - real use case
List<ApiResponse> responses = largeList.parallelStream()
    .map(this::callExpensiveApi)  // Дорогая операция
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

Potential Issues

1. Shared ForkJoinPool может быть bottleneck:

// Проблема: разные задачи конкурируют за один пул
ExecutorService executor = ForkJoinPool.commonPool();

// Task 1 использует параллельный stream
executor.submit(() -> {
    List<Integer> list = generateBigList();
    list.parallelStream().forEach(this::expensiveOperation);
});

// Task 2 тоже использует параллельный stream
executor.submit(() -> {
    List<Integer> list = generateBigList();
    list.parallelStream().forEach(this::anotherExpensiveOperation);
});

// Решение: используй свой ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.invoke(ForkJoinTask.adapt(() -> {
    list.parallelStream().forEach(this::expensiveOperation);
}));

2. Не потокобезопасные операции:

// Плохо - race condition!
List<String> result = new ArrayList<>();
list.parallelStream()
    .map(n -> n.toString())
    .forEach(result::add);  // ArrayList не потокобезопасен!

// Правильно
List<String> result = list.parallelStream()
    .map(n -> n.toString())
    .collect(Collectors.toList()); // Потокобезопасно

3. Неопределённый порядок:

// Порядок может быть иным в parallelStream
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// Sequential: гарантирует порядок
numbers.stream()
    .forEach(System.out::println);  // 1, 2, 3, 4, 5

// Parallel: порядок не гарантирован
numbers.parallelStream()
    .forEach(System.out::println);  // Может быть: 3, 1, 5, 2, 4

// Если нужен порядок:
numbers.parallelStream()
    .forEachOrdered(System::out::println);  // 1, 2, 3, 4, 5

Benchmarking Parallel Stream

// JMH benchmark
@Benchmark
public int sequential() {
    return list.stream()
        .filter(n -> n % 2 == 0)
        .reduce(0, Integer::sum);
}

@Benchmark
public int parallel() {
    return list.parallelStream()
        .filter(n -> n % 2 == 0)
        .reduce(0, Integer::sum);
}

// Result:
// Sequential: 1.2ms
// Parallel: 5.8ms (медленнее из-за overhead!)

Лучшие практики

  1. Используй parallel только для больших данных (100K+ элементов)
  2. Избегай дорогих операций в filter — это выполняется последовательно на каждом потоке
  3. Используй собственный ForkJoinPool для критичных задач
  4. Всегда бенчмарь — параллелизм может быть медленнее
  5. Не используй stateful operations (forEach с внешним состоянием)
  6. Помни о GC pressure — больше потоков = больше мусора

Заключение

Parallel streams — это не волшебство. Они основаны на ForkJoinPool с work-stealing алгоритмом. Важно понимать, когда они действительно помогают, и всегда мерить производительность реальными бенчмарками, а не предположениями.