← Назад к вопросам

Сколько потоков в ForkJoinPool?

2.7 Senior🔥 41 комментариев
#Stream API и функциональное программирование#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

ForkJoinPool: количество потоков и настройка

ForkJoinPool — это специализированный thread pool для параллельной обработки данных, оптимизированный для tasks, которые рекурсивно разбиваются на подзадачи (Divide-and-Conquer).

Сколько потоков в ForkJoinPool?

Ответ: По умолчанию количество потоков равно количеству ядер процессора

import java.util.concurrent.ForkJoinPool;

// Получить количество потоков в ForkJoinPool
ForkJoinPool pool = ForkJoinPool.commonPool();
int parallelism = pool.getParallelism();

System.out.println("Parallelism level: " + parallelism);
// На 4-ядерной системе выведет: Parallelism level: 4
// На 8-ядерной системе выведет: Parallelism level: 8

Как это работает

ForkJoinPool с 4 потоками (на 4-ядерном CPU):
┌─────────────────────────────────────────┐
│ ForkJoinPool (parallelism = 4)          │
├─────────────────────────────────────────┤
│ Thread-1  │ Thread-2  │ Thread-3  │ Thread-4 │
├─────────────────────────────────────────┤
│ Task queue (каждому потоку)             │
│ + Work-stealing queue (могут стянуть)   │
└─────────────────────────────────────────┘

Формула для размера ForkJoinPool

Parallelism = Runtime.getRuntime().availableProcessors() - 1

Обычно используется количество ядер минус 1, чтобы main thread имел возможность выполняться.

Два типа ForkJoinPool

1. Common ForkJoinPool (глобальный, singleton)

ForkJoinPool commonPool = ForkJoinPool.commonPool();
// Используется по умолчанию для parallel streams
// Используется в CompletableFuture
// НЕ нужно закрывать (shutdown)

int commonParallelism = ForkJoinPool.getCommonPoolParallelism();
System.out.println(commonParallelism);

2. Custom ForkJoinPool (создаёшь сам)

// Создаём свой pool с определённым parallelism
ForkJoinPool customPool = new ForkJoinPool(8);

// ОБЯЗАТЕЛЬНО закрыть
customPool.shutdown();
// или
customPool.shutdownNow();

Пример: Parallel Array Sum

import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class ParallelArraySum {
    // RecursiveTask — базовый класс для разделяемых задач
    static class SumTask extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10000;
        private int[] array;
        private int start;
        private int end;
        
        SumTask(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected Long compute() {
            int length = end - start;
            
            // Если достаточно мало элементов — считаем прямо
            if (length <= THRESHOLD) {
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            }
            
            // Иначе — разбиваем на две подзадачи
            int mid = start + length / 2;
            
            SumTask leftTask = new SumTask(array, start, mid);
            SumTask rightTask = new SumTask(array, mid, end);
            
            // Fork: запускаем обе задачи параллельно
            leftTask.fork();
            rightTask.fork();
            
            // Join: ждём результаты
            long leftSum = leftTask.join();
            long rightSum = rightTask.join();
            
            return leftSum + rightSum;
        }
    }
    
    public static void main(String[] args) {
        int[] array = new int[100000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i;
        }
        
        // Используем common pool
        ForkJoinTask<Long> task = ForkJoinPool.commonPool()
            .submit(new SumTask(array, 0, array.length));
        
        long result = task.join();
        System.out.println("Sum: " + result);
        
        // Информация про pool
        System.out.println("Pool parallelism: " + 
            ForkJoinPool.getCommonPoolParallelism());
    }
}

Пример с Parallel Streams

int[] array = new int[1000000];
for (int i = 0; i < array.length; i++) {
    array[i] = i;
}

// Использует common ForkJoinPool
long sum = Arrays.stream(array)
    .parallel()  // Включаем параллелизм
    .mapToLong(x -> x)
    .sum();

System.out.println("Sum: " + sum);

Параллельный stream использует common ForkJoinPool с parallelism = количество ядер.

Как настроить размер ForkJoinPool?

Способ 1: System property

# При запуске JVM
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=16 MyApp

Способ 2: В коде (custom pool)

// Создаём pool с определённым размером
ForkJoinPool myPool = new ForkJoinPool(16);

// Используем его в parallel streams
int sum = myPool.invoke(new SumTask(array, 0, array.length));

Способ 3: Стратегические hints

public class CustomForkJoinPool {
    private static final ForkJoinPool pool = new ForkJoinPool(
        Runtime.getRuntime().availableProcessors() * 2  // Переsubscribe
    );
    
    public static ForkJoinPool getPool() {
        return pool;
    }
}

Work-Stealing алгоритм

Ключевая особенность ForkJoinPool — work-stealing queue:

Потоки в ForkJoinPool имеют свои очереди:
┌──────────────┬──────────────┬──────────────┬──────────────┐
│ Thread-1     │ Thread-2      │ Thread-3      │ Thread-4     │
│ Queue:       │ Queue:        │ Queue:        │ Queue:       │
│ [Task1]      │ [Task2]       │ []            │ [Task4, 5, 6]│
│ [Task7]      │ []            │ []            │ []           │
└──────────────┴──────────────┴──────────────┴──────────────┘
    WAITING         WAITING      IDLE           BUSY
       ↓              ↓                            ↓
       Thread-3 "крадёт" Task4 из очереди Thread-4

Это позволяет лучше распределить нагрузку: idle потоки могут взять задачу из очереди busy потока.

Performance considerations

Когда ForkJoinPool эффективен:

  • Divide-and-conquer задачи (merge sort, tree traversal)
  • Параллельные streams с большим объёмом данных
  • Рекурсивные алгоритмы
// Хорошо для ForkJoinPool
long sum = Arrays.stream(hugeArray)
    .parallel()
    .filter(x -> x > 100)
    .map(x -> x * 2)
    .sum();  // Много работы на большом объёме

Когда ForkJoinPool неэффективен:

  • Блокирующие операции (I/O, synchronization)
  • Очень мелкие задачи (overhead больше чем profit)
  • Зависимые задачи (deadlock risk)
// Плохо для ForkJoinPool
long sum = Arrays.stream(tinyArray)  // Только 10 элементов
    .parallel()
    .sum();

// Плохо: blocking operation
Stream<String> lines = Files.lines(file)
    .parallel()  // Заблокирует потоки на I/O
    .filter(...);

Типичные параметры

CPU cores     Common Pool parallelism
1             1
2             2
4             4
8             8
16            16
32            32

Контрольный вопрос

Ты знаешь, что:

  • ✓ ForkJoinPool по умолчанию имеет parallelism = number of cores
  • ✓ Это настраивается через system property или custom pool
  • ✓ Common pool используется для parallel streams и CompletableFuture
  • ✓ Work-stealing делает его эффективным для divide-and-conquer
  • ✓ НЕ подходит для blocking операций

Это основные знания про ForkJoinPool для интервью!