Сколько потоков в ForkJoinPool?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
ForkJoinPool: количество потоков и настройка
ForkJoinPool — это специализированный thread pool для параллельной обработки данных, оптимизированный для tasks, которые рекурсивно разбиваются на подзадачи (Divide-and-Conquer).
Сколько потоков в ForkJoinPool?
Ответ: По умолчанию количество потоков равно количеству ядер процессора
import java.util.concurrent.ForkJoinPool;
// Получить количество потоков в ForkJoinPool
ForkJoinPool pool = ForkJoinPool.commonPool();
int parallelism = pool.getParallelism();
System.out.println("Parallelism level: " + parallelism);
// На 4-ядерной системе выведет: Parallelism level: 4
// На 8-ядерной системе выведет: Parallelism level: 8
Как это работает
ForkJoinPool с 4 потоками (на 4-ядерном CPU):
┌─────────────────────────────────────────┐
│ ForkJoinPool (parallelism = 4) │
├─────────────────────────────────────────┤
│ Thread-1 │ Thread-2 │ Thread-3 │ Thread-4 │
├─────────────────────────────────────────┤
│ Task queue (каждому потоку) │
│ + Work-stealing queue (могут стянуть) │
└─────────────────────────────────────────┘
Формула для размера ForkJoinPool
Parallelism = Runtime.getRuntime().availableProcessors() - 1
Обычно используется количество ядер минус 1, чтобы main thread имел возможность выполняться.
Два типа ForkJoinPool
1. Common ForkJoinPool (глобальный, singleton)
ForkJoinPool commonPool = ForkJoinPool.commonPool();
// Используется по умолчанию для parallel streams
// Используется в CompletableFuture
// НЕ нужно закрывать (shutdown)
int commonParallelism = ForkJoinPool.getCommonPoolParallelism();
System.out.println(commonParallelism);
2. Custom ForkJoinPool (создаёшь сам)
// Создаём свой pool с определённым parallelism
ForkJoinPool customPool = new ForkJoinPool(8);
// ОБЯЗАТЕЛЬНО закрыть
customPool.shutdown();
// или
customPool.shutdownNow();
Пример: Parallel Array Sum
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ParallelArraySum {
// RecursiveTask — базовый класс для разделяемых задач
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private int[] array;
private int start;
private int end;
SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// Если достаточно мало элементов — считаем прямо
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// Иначе — разбиваем на две подзадачи
int mid = start + length / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// Fork: запускаем обе задачи параллельно
leftTask.fork();
rightTask.fork();
// Join: ждём результаты
long leftSum = leftTask.join();
long rightSum = rightTask.join();
return leftSum + rightSum;
}
}
public static void main(String[] args) {
int[] array = new int[100000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
// Используем common pool
ForkJoinTask<Long> task = ForkJoinPool.commonPool()
.submit(new SumTask(array, 0, array.length));
long result = task.join();
System.out.println("Sum: " + result);
// Информация про pool
System.out.println("Pool parallelism: " +
ForkJoinPool.getCommonPoolParallelism());
}
}
Пример с Parallel Streams
int[] array = new int[1000000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
// Использует common ForkJoinPool
long sum = Arrays.stream(array)
.parallel() // Включаем параллелизм
.mapToLong(x -> x)
.sum();
System.out.println("Sum: " + sum);
Параллельный stream использует common ForkJoinPool с parallelism = количество ядер.
Как настроить размер ForkJoinPool?
Способ 1: System property
# При запуске JVM
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=16 MyApp
Способ 2: В коде (custom pool)
// Создаём pool с определённым размером
ForkJoinPool myPool = new ForkJoinPool(16);
// Используем его в parallel streams
int sum = myPool.invoke(new SumTask(array, 0, array.length));
Способ 3: Стратегические hints
public class CustomForkJoinPool {
private static final ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors() * 2 // Переsubscribe
);
public static ForkJoinPool getPool() {
return pool;
}
}
Work-Stealing алгоритм
Ключевая особенность ForkJoinPool — work-stealing queue:
Потоки в ForkJoinPool имеют свои очереди:
┌──────────────┬──────────────┬──────────────┬──────────────┐
│ Thread-1 │ Thread-2 │ Thread-3 │ Thread-4 │
│ Queue: │ Queue: │ Queue: │ Queue: │
│ [Task1] │ [Task2] │ [] │ [Task4, 5, 6]│
│ [Task7] │ [] │ [] │ [] │
└──────────────┴──────────────┴──────────────┴──────────────┘
WAITING WAITING IDLE BUSY
↓ ↓ ↓
Thread-3 "крадёт" Task4 из очереди Thread-4
Это позволяет лучше распределить нагрузку: idle потоки могут взять задачу из очереди busy потока.
Performance considerations
Когда ForkJoinPool эффективен:
- Divide-and-conquer задачи (merge sort, tree traversal)
- Параллельные streams с большим объёмом данных
- Рекурсивные алгоритмы
// Хорошо для ForkJoinPool
long sum = Arrays.stream(hugeArray)
.parallel()
.filter(x -> x > 100)
.map(x -> x * 2)
.sum(); // Много работы на большом объёме
Когда ForkJoinPool неэффективен:
- Блокирующие операции (I/O, synchronization)
- Очень мелкие задачи (overhead больше чем profit)
- Зависимые задачи (deadlock risk)
// Плохо для ForkJoinPool
long sum = Arrays.stream(tinyArray) // Только 10 элементов
.parallel()
.sum();
// Плохо: blocking operation
Stream<String> lines = Files.lines(file)
.parallel() // Заблокирует потоки на I/O
.filter(...);
Типичные параметры
CPU cores Common Pool parallelism
1 1
2 2
4 4
8 8
16 16
32 32
Контрольный вопрос
Ты знаешь, что:
- ✓ ForkJoinPool по умолчанию имеет parallelism = number of cores
- ✓ Это настраивается через system property или custom pool
- ✓ Common pool используется для parallel streams и CompletableFuture
- ✓ Work-stealing делает его эффективным для divide-and-conquer
- ✓ НЕ подходит для blocking операций
Это основные знания про ForkJoinPool для интервью!