← Назад к вопросам
Сколько потоков использует по умолчанию параллельный Stream в Stream API?
2.0 Middle🔥 101 комментариев
#Stream API и функциональное программирование#Многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Ответ
Параллельный Stream в Java по умолчанию использует столько потоков, сколько ядер в процессоре (ForkJoinPool.commonPool).
Точный механизм
import java.util.Arrays;
import java.util.stream.IntStream;
public class StreamThreadsExample {
public static void main(String[] args) {
// Узнаём количество доступных ядер
int coreCount = Runtime.getRuntime().availableProcessors();
System.out.println("Cores: " + coreCount); // Например: 8
// Параллельный Stream использует ForkJoinPool.commonPool()
// Количество потоков = coreCount - 1 (для сохранения основного потока)
int parallelismLevel = ForkJoinPool.getCommonPoolParallelism();
System.out.println("Parallelism level: " + parallelismLevel); // 7 для 8 ядер
}
}
Как работает параллельный Stream
public class ParallelStreamDemo {
public static void main(String[] args) {
// Создание параллельного stream
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
// ❌ Обычный (последовательный) Stream
numbers.stream()
.map(n -> {
System.out.println("Processing " + n + " in " +
Thread.currentThread().getName());
return n * 2;
})
.forEach(System.out::println);
// Вывод: main thread обрабатывает все элементы
// Processing 1 in main
// Processing 2 in main
// Processing 3 in main
// ...
System.out.println("\n--- PARALLEL STREAM ---\n");
// ✅ Параллельный Stream
numbers.parallelStream()
.map(n -> {
System.out.println("Processing " + n + " in " +
Thread.currentThread().getName());
return n * 2;
})
.forEach(System.out::println);
// Вывод: несколько потоков из ForkJoinPool обрабатывают элементы параллельно
// Processing 1 in ForkJoinPool.commonPool-worker-1
// Processing 3 in ForkJoinPool.commonPool-worker-3
// Processing 2 in ForkJoinPool.commonPool-worker-2
// Processing 4 in main
// ...
}
}
ForkJoinPool.commonPool() детали
public class ForkJoinPoolAnalysis {
public static void main(String[] args) {
// Получить информацию о commonPool
ForkJoinPool pool = ForkJoinPool.commonPool();
System.out.println("Pool class: " + pool.getClass());
// Output: class java.util.concurrent.ForkJoinPool
System.out.println("Parallelism: " + pool.getParallelism());
// Output: 7 (для 8-ядерного процессора)
System.out.println("Active threads: " + pool.getActiveThreadCount());
// Output: варьируется в зависимости от нагрузки
System.out.println("Running threads: " + pool.getRunningThreadCount());
// Output: количество потоков, выполняющих работу
// Формула: parallelism = availableProcessors() - 1
// Исключение: если coreCount == 1, parallelism = 1
int cores = Runtime.getRuntime().availableProcessors();
int expectedParallelism = cores == 1 ? 1 : cores - 1;
System.out.println("Expected parallelism: " + expectedParallelism);
}
}
Пример: работа с большим количеством элементов
public class ParallelStreamPerformance {
public static void main(String[] args) {
int size = 10_000_000;
// Обычный Stream (последовательно)
long startSeq = System.nanoTime();
long sumSeq = IntStream.range(0, size)
.map(n -> expensiveOperation(n)) // Дорогая операция
.sum();
long timeSeq = System.nanoTime() - startSeq;
System.out.println("Sequential: " + (timeSeq / 1_000_000) + "ms");
// Параллельный Stream
long startPar = System.nanoTime();
long sumPar = IntStream.range(0, size)
.parallel()
.map(n -> expensiveOperation(n))
.sum();
long timePar = System.nanoTime() - startPar;
System.out.println("Parallel: " + (timePar / 1_000_000) + "ms");
System.out.println("Speedup: " + (double) timeSeq / timePar + "x");
// Speedup близко к количеству ядер (например 7.8x для 8 ядер)
}
private static int expensiveOperation(int n) {
// Имитация дорогой операции
for (int i = 0; i < 1000; i++) {
n = n * 2 / 3;
}
return n;
}
}
Контроль количества потоков
По умолчанию нельзя изменить commonPool, но можно создать свой ForkJoinPool:
public class CustomForkJoinPool {
public static void main(String[] args) throws Exception {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
// ❌ Нельзя просто вызвать parallel() с кастомным pool
// numbers.stream().parallel() всегда использует commonPool
// ✅ Но можно обернуть в ForkJoinTask
ForkJoinPool customPool = new ForkJoinPool(4); // 4 потока вместо 7
Integer result = customPool.invoke(
new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
// Теперь этот код выполнится в customPool с 4 потоками
return numbers.parallelStream()
.map(n -> {
System.out.println("Thread: " +
Thread.currentThread().getName());
return n * 2;
})
.reduce(0, Integer::sum);
}
}
);
System.out.println("Result: " + result);
customPool.shutdown();
}
}
System Property для контроля parallelism
# Запуск Java с кастомным parallelism level
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 MyApp
# Это установит commonPool parallelism на 4 вместо автоматического
public class CheckSystemProperty {
public static void main(String[] args) {
String prop = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
System.out.println("Custom parallelism: " + prop); // null если не установлено
// Если null, используется: max(1, Runtime.getRuntime().availableProcessors() - 1)
int parallelism = ForkJoinPool.getCommonPoolParallelism();
System.out.println("Actual parallelism: " + parallelism);
}
}
Когда параллельный Stream МЕДЛЕННЕЕ
public class ParallelVsSequential {
public static void main(String[] args) {
// ❌ Параллельный Stream медленнее для:
// 1. Маленьких коллекций (overhead > benefit)
List<Integer> small = Arrays.asList(1, 2, 3);
long start = System.nanoTime();
small.parallelStream() // Медленнее! Overhead создания потоков > gain
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Parallel time: " + (System.nanoTime() - start));
// ✅ Параллельный Stream эффективен для:
List<Integer> large = IntStream.range(0, 1_000_000)
.boxed()
.collect(Collectors.toList());
start = System.nanoTime();
large.parallelStream()
.map(n -> expensiveOp(n))
.collect(Collectors.toList());
System.out.println("Parallel time (large): " + (System.nanoTime() - start));
}
private static int expensiveOp(int n) {
for (int i = 0; i < 10000; i++) {
n = n * 2 / 3;
}
return n;
}
}
Рекомендации по использованию
public class BestPractices {
public static void main(String[] args) {
// ✅ ИСПОЛЬЗУЙ параллельный Stream для:
// 1. Больших коллекций (> 10,000 элементов)
// 2. Дорогих операций (каждый элемент обрабатывается долго)
// 3. Независимых операций (нет shared state)
List<User> users = loadMillionUsers(); // 1 миллион пользователей
// Дорогая операция: трансформация данных
List<UserDTO> dtos = users.parallelStream()
.map(user -> {
// Expensive: запрос в БД, внешний API
return new UserDTO(
user.getId(),
user.getName(),
fetchUserPreferences(user.getId()) // Дорого!
);
})
.filter(dto -> dto.isActive())
.collect(Collectors.toList());
// ❌ НЕ ИСПОЛЬЗУЙ параллельный Stream для:
// 1. Маленьких коллекций (< 1,000)
// 2. Простых операций (например, просто умножение на число)
// 3. Ordered Stream с collect()
// 4. Если нужна гарантированная последовательность элементов
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// ❌ Плохая идея
List<Integer> doubled = numbers.parallelStream()
.map(n -> n * 2)
.collect(Collectors.toList()); // Overhead > benefit
// ✅ Хорошо
List<Integer> doubled2 = numbers.stream()
.map(n -> n * 2)
.collect(Collectors.toList());
}
private static UserPreferences fetchUserPreferences(Long userId) {
// Дорогая операция
try { Thread.sleep(10); } catch (Exception e) {}
return new UserPreferences();
}
private static List<User> loadMillionUsers() {
return IntStream.range(0, 1_000_000)
.mapToObj(i -> new User(i, "User" + i))
.collect(Collectors.toList());
}
static class User {
int id;
String name;
User(int id, String name) { this.id = id; this.name = name; }
int getId() { return id; }
String getName() { return name; }
}
static class UserDTO {
int id;
String name;
UserPreferences prefs;
UserDTO(int id, String name, UserPreferences prefs) {
this.id = id; this.name = name; this.prefs = prefs;
}
boolean isActive() { return true; }
}
static class UserPreferences {}
}
Мониторинг потоков параллельного Stream
public class MonitorParallelStream {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
// Запустить параллельный Stream и отслеживать потоки
Future<?> future = executor.submit(() -> {
List<Integer> numbers = IntStream.range(0, 1000000)
.boxed()
.collect(Collectors.toList());
numbers.parallelStream()
.map(n -> {
System.out.println("Processing " + n + " in " +
Thread.currentThread().getName());
try { Thread.sleep(100); } catch (Exception e) {}
return n * 2;
})
.collect(Collectors.toList());
});
// Показать активные потоки
while (!future.isDone()) {
ThreadGroup group = Thread.currentThread().getThreadGroup();
System.out.println("Active threads: " + group.activeCount());
Thread.sleep(500);
}
executor.shutdown();
}
}
Итог
📊 Количество потоков в параллельном Stream:
= ForkJoinPool.getCommonPoolParallelism()
= max(1, Runtime.getRuntime().availableProcessors() - 1)
Примеры:
- 4-ядерный CPU → 3 потока
- 8-ядерный CPU → 7 потоков
- 16-ядерный CPU → 15 потоков
✅ ИСПОЛЬЗУЙ .parallelStream() для:
- Больших коллекций (> 10k элементов)
- Дорогих операций (I/O, вычисления)
- Независимой обработки элементов
❌ НЕ ИСПОЛЬЗУЙ для:
- Маленьких коллекций (< 1k)
- Простых операций
- Если нужна гарантированная последовательность
⚙️ Можно менять через:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=4
💡 Правило: parallelism = availableProcessors - 1
(исключение: если процессор 1, parallelism = 1)