← Назад к вопросам

Сколько потоков использует по умолчанию параллельный Stream в Stream API?

2.0 Middle🔥 101 комментариев
#Stream API и функциональное программирование#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ответ

Параллельный Stream в Java по умолчанию использует столько потоков, сколько ядер в процессоре (ForkJoinPool.commonPool).

Точный механизм

import java.util.Arrays;
import java.util.stream.IntStream;

public class StreamThreadsExample {
    public static void main(String[] args) {
        // Узнаём количество доступных ядер
        int coreCount = Runtime.getRuntime().availableProcessors();
        System.out.println("Cores: " + coreCount);  // Например: 8
        
        // Параллельный Stream использует ForkJoinPool.commonPool()
        // Количество потоков = coreCount - 1 (для сохранения основного потока)
        int parallelismLevel = ForkJoinPool.getCommonPoolParallelism();
        System.out.println("Parallelism level: " + parallelismLevel);  // 7 для 8 ядер
    }
}

Как работает параллельный Stream

public class ParallelStreamDemo {
    public static void main(String[] args) {
        // Создание параллельного stream
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        
        // ❌ Обычный (последовательный) Stream
        numbers.stream()
            .map(n -> {
                System.out.println("Processing " + n + " in " + 
                    Thread.currentThread().getName());
                return n * 2;
            })
            .forEach(System.out::println);
        
        // Вывод: main thread обрабатывает все элементы
        // Processing 1 in main
        // Processing 2 in main
        // Processing 3 in main
        // ...
        
        System.out.println("\n--- PARALLEL STREAM ---\n");
        
        // ✅ Параллельный Stream
        numbers.parallelStream()
            .map(n -> {
                System.out.println("Processing " + n + " in " + 
                    Thread.currentThread().getName());
                return n * 2;
            })
            .forEach(System.out::println);
        
        // Вывод: несколько потоков из ForkJoinPool обрабатывают элементы параллельно
        // Processing 1 in ForkJoinPool.commonPool-worker-1
        // Processing 3 in ForkJoinPool.commonPool-worker-3
        // Processing 2 in ForkJoinPool.commonPool-worker-2
        // Processing 4 in main
        // ...
    }
}

ForkJoinPool.commonPool() детали

public class ForkJoinPoolAnalysis {
    public static void main(String[] args) {
        // Получить информацию о commonPool
        ForkJoinPool pool = ForkJoinPool.commonPool();
        
        System.out.println("Pool class: " + pool.getClass());
        // Output: class java.util.concurrent.ForkJoinPool
        
        System.out.println("Parallelism: " + pool.getParallelism());
        // Output: 7 (для 8-ядерного процессора)
        
        System.out.println("Active threads: " + pool.getActiveThreadCount());
        // Output: варьируется в зависимости от нагрузки
        
        System.out.println("Running threads: " + pool.getRunningThreadCount());
        // Output: количество потоков, выполняющих работу
        
        // Формула: parallelism = availableProcessors() - 1
        // Исключение: если coreCount == 1, parallelism = 1
        int cores = Runtime.getRuntime().availableProcessors();
        int expectedParallelism = cores == 1 ? 1 : cores - 1;
        System.out.println("Expected parallelism: " + expectedParallelism);
    }
}

Пример: работа с большим количеством элементов

public class ParallelStreamPerformance {
    public static void main(String[] args) {
        int size = 10_000_000;
        
        // Обычный Stream (последовательно)
        long startSeq = System.nanoTime();
        long sumSeq = IntStream.range(0, size)
            .map(n -> expensiveOperation(n))  // Дорогая операция
            .sum();
        long timeSeq = System.nanoTime() - startSeq;
        
        System.out.println("Sequential: " + (timeSeq / 1_000_000) + "ms");
        
        // Параллельный Stream
        long startPar = System.nanoTime();
        long sumPar = IntStream.range(0, size)
            .parallel()
            .map(n -> expensiveOperation(n))
            .sum();
        long timePar = System.nanoTime() - startPar;
        
        System.out.println("Parallel: " + (timePar / 1_000_000) + "ms");
        System.out.println("Speedup: " + (double) timeSeq / timePar + "x");
        // Speedup близко к количеству ядер (например 7.8x для 8 ядер)
    }
    
    private static int expensiveOperation(int n) {
        // Имитация дорогой операции
        for (int i = 0; i < 1000; i++) {
            n = n * 2 / 3;
        }
        return n;
    }
}

Контроль количества потоков

По умолчанию нельзя изменить commonPool, но можно создать свой ForkJoinPool:

public class CustomForkJoinPool {
    public static void main(String[] args) throws Exception {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
        
        // ❌ Нельзя просто вызвать parallel() с кастомным pool
        // numbers.stream().parallel() всегда использует commonPool
        
        // ✅ Но можно обернуть в ForkJoinTask
        ForkJoinPool customPool = new ForkJoinPool(4);  // 4 потока вместо 7
        
        Integer result = customPool.invoke(
            new RecursiveTask<Integer>() {
                @Override
                protected Integer compute() {
                    // Теперь этот код выполнится в customPool с 4 потоками
                    return numbers.parallelStream()
                        .map(n -> {
                            System.out.println("Thread: " + 
                                Thread.currentThread().getName());
                            return n * 2;
                        })
                        .reduce(0, Integer::sum);
                }
            }
        );
        
        System.out.println("Result: " + result);
        customPool.shutdown();
    }
}

System Property для контроля parallelism

# Запуск Java с кастомным parallelism level
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 MyApp

# Это установит commonPool parallelism на 4 вместо автоматического
public class CheckSystemProperty {
    public static void main(String[] args) {
        String prop = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
        System.out.println("Custom parallelism: " + prop);  // null если не установлено
        
        // Если null, используется: max(1, Runtime.getRuntime().availableProcessors() - 1)
        int parallelism = ForkJoinPool.getCommonPoolParallelism();
        System.out.println("Actual parallelism: " + parallelism);
    }
}

Когда параллельный Stream МЕДЛЕННЕЕ

public class ParallelVsSequential {
    public static void main(String[] args) {
        // ❌ Параллельный Stream медленнее для:
        
        // 1. Маленьких коллекций (overhead > benefit)
        List<Integer> small = Arrays.asList(1, 2, 3);
        long start = System.nanoTime();
        small.parallelStream()  // Медленнее! Overhead создания потоков > gain
            .map(n -> n * 2)
            .collect(Collectors.toList());
        System.out.println("Parallel time: " + (System.nanoTime() - start));
        
        // ✅ Параллельный Stream эффективен для:
        List<Integer> large = IntStream.range(0, 1_000_000)
            .boxed()
            .collect(Collectors.toList());
        
        start = System.nanoTime();
        large.parallelStream()
            .map(n -> expensiveOp(n))
            .collect(Collectors.toList());
        System.out.println("Parallel time (large): " + (System.nanoTime() - start));
    }
    
    private static int expensiveOp(int n) {
        for (int i = 0; i < 10000; i++) {
            n = n * 2 / 3;
        }
        return n;
    }
}

Рекомендации по использованию

public class BestPractices {
    public static void main(String[] args) {
        // ✅ ИСПОЛЬЗУЙ параллельный Stream для:
        // 1. Больших коллекций (> 10,000 элементов)
        // 2. Дорогих операций (каждый элемент обрабатывается долго)
        // 3. Независимых операций (нет shared state)
        
        List<User> users = loadMillionUsers();  // 1 миллион пользователей
        
        // Дорогая операция: трансформация данных
        List<UserDTO> dtos = users.parallelStream()
            .map(user -> {
                // Expensive: запрос в БД, внешний API
                return new UserDTO(
                    user.getId(),
                    user.getName(),
                    fetchUserPreferences(user.getId())  // Дорого!
                );
            })
            .filter(dto -> dto.isActive())
            .collect(Collectors.toList());
        
        // ❌ НЕ ИСПОЛЬЗУЙ параллельный Stream для:
        // 1. Маленьких коллекций (< 1,000)
        // 2. Простых операций (например, просто умножение на число)
        // 3. Ordered Stream с collect()
        // 4. Если нужна гарантированная последовательность элементов
        
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        
        // ❌ Плохая идея
        List<Integer> doubled = numbers.parallelStream()
            .map(n -> n * 2)
            .collect(Collectors.toList());  // Overhead > benefit
        
        // ✅ Хорошо
        List<Integer> doubled2 = numbers.stream()
            .map(n -> n * 2)
            .collect(Collectors.toList());
    }
    
    private static UserPreferences fetchUserPreferences(Long userId) {
        // Дорогая операция
        try { Thread.sleep(10); } catch (Exception e) {}
        return new UserPreferences();
    }
    
    private static List<User> loadMillionUsers() {
        return IntStream.range(0, 1_000_000)
            .mapToObj(i -> new User(i, "User" + i))
            .collect(Collectors.toList());
    }
    
    static class User {
        int id;
        String name;
        User(int id, String name) { this.id = id; this.name = name; }
        int getId() { return id; }
        String getName() { return name; }
    }
    
    static class UserDTO {
        int id;
        String name;
        UserPreferences prefs;
        UserDTO(int id, String name, UserPreferences prefs) {
            this.id = id; this.name = name; this.prefs = prefs;
        }
        boolean isActive() { return true; }
    }
    
    static class UserPreferences {}
}

Мониторинг потоков параллельного Stream

public class MonitorParallelStream {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        // Запустить параллельный Stream и отслеживать потоки
        Future<?> future = executor.submit(() -> {
            List<Integer> numbers = IntStream.range(0, 1000000)
                .boxed()
                .collect(Collectors.toList());
            
            numbers.parallelStream()
                .map(n -> {
                    System.out.println("Processing " + n + " in " + 
                        Thread.currentThread().getName());
                    try { Thread.sleep(100); } catch (Exception e) {}
                    return n * 2;
                })
                .collect(Collectors.toList());
        });
        
        // Показать активные потоки
        while (!future.isDone()) {
            ThreadGroup group = Thread.currentThread().getThreadGroup();
            System.out.println("Active threads: " + group.activeCount());
            Thread.sleep(500);
        }
        
        executor.shutdown();
    }
}

Итог

📊 Количество потоков в параллельном Stream:
  = ForkJoinPool.getCommonPoolParallelism()
  = max(1, Runtime.getRuntime().availableProcessors() - 1)
  
Примеры:
  - 4-ядерный CPU → 3 потока
  - 8-ядерный CPU → 7 потоков
  - 16-ядерный CPU → 15 потоков

✅ ИСПОЛЬЗУЙ .parallelStream() для:
  - Больших коллекций (> 10k элементов)
  - Дорогих операций (I/O, вычисления)
  - Независимой обработки элементов

❌ НЕ ИСПОЛЬЗУЙ для:
  - Маленьких коллекций (< 1k)
  - Простых операций
  - Если нужна гарантированная последовательность

⚙️ Можно менять через:
  -Djava.util.concurrent.ForkJoinPool.common.parallelism=4

💡 Правило: parallelism = availableProcessors - 1
   (исключение: если процессор 1, parallelism = 1)