← Назад к вопросам

Где может пригодиться Work Stealing?

2.0 Middle🔥 131 комментариев
#Основы Java

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Work Stealing: где и почему это пригодится

Work Stealing — это техника распределения задач в многопоточных системах, которая решает проблему неравномерной нагрузки на потоки.

Что такое Work Stealing?

Work Stealing (кража работы) — это алгоритм планирования, при котором свободный поток может "украсть" задачу из очереди другого потока, вместо того чтобы простаивать.

Без Work Stealing:
┌─────────┬─────────┬─────────┬─────────┐
│ Thread1 │ Thread2 │ Thread3 │ Thread4 │
├─────────┼─────────┼─────────┼─────────┤
│ [Task1] │ [Task2] │ [Task3] │         │
│ [Task4] │ [Task5] │         │  IDLE!  │  ← Thread4 ничего не делает
│ [Task6] │         │         │         │
└─────────┴─────────┴─────────┴─────────┘

С Work Stealing:
┌─────────┬─────────┬─────────┬─────────┐
│ Thread1 │ Thread2 │ Thread3 │ Thread4 │
├─────────┼─────────┼─────────┼─────────┤
│ [Task1] │ [Task2] │ [Task3] │ [Task4] │  ← Thread4 украл Task4
│ [Task5] │ [Task6] │         │         │
└─────────┴─────────┴─────────┴─────────┘

Где пригодится Work Stealing?

1. ForkJoinPool (рекомендуемое использование)

Java предоставляет ForkJoinPool, который использует work stealing:

public class WorkStealingExample {
    
    public static void main(String[] args) {
        // ForkJoinPool использует work stealing по умолчанию
        ForkJoinPool pool = ForkJoinPool.commonPool();
        
        // Или создаём свой pool
        ForkJoinPool customPool = new ForkJoinPool(
            Runtime.getRuntime().availableProcessors()
        );
        
        // Выполняем divide-and-conquer задачу
        int[] array = new int[1000000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i;
        }
        
        // Work stealing эффективен для параллельных вычислений
        long sum = pool.invoke(
            new RecursiveTask<Long>() {
                protected Long compute() {
                    // Implementation
                    return 0L;
                }
            }
        );
    }
}

2. Параллельные потоки данных (Parallel Streams)

Parallel Streams используют ForkJoinPool с work stealing:

// Параллельная обработка данных
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

// parallelStream использует work stealing для распределения
int sum = numbers.parallelStream()
    .map(n -> {
        // Некоторые операции выполняются долго
        try {
            Thread.sleep((long)(Math.random() * 1000));
        } catch (InterruptedException e) {}
        return n * 2;
    })
    .sum();

// Если без work stealing:
// Thread1 получит элементы 1,2,3 (долгие операции)
// Thread2 получит элементы 5,6,7,8 (быстро закончит и будет ждать)

// С work stealing:
// Thread2 украдёт элементы из очереди Thread1
// Потоки будут загружены равномерно

3. Recursive Task Division (Рекурсивное разбиение задач)

public class RecursiveArraySum extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private int[] array;
    private int start;
    private int end;
    
    public RecursiveArraySum(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // Базовый случай — простые вычисления
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // Рекурсивный случай — разделяем на подзадачи
            int mid = (start + end) / 2;
            
            RecursiveArraySum left = new RecursiveArraySum(array, start, mid);
            RecursiveArraySum right = new RecursiveArraySum(array, mid, end);
            
            left.fork();  // Отправляем в очередь
            long rightResult = right.compute();  // Вычисляем локально
            long leftResult = left.join();       // Ждём результата
            
            return leftResult + rightResult;
        }
    }
    
    // Использование
    public static void main(String[] args) {
        int[] array = new int[1000000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i;
        }
        
        // Work stealing активен здесь
        ForkJoinPool pool = ForkJoinPool.commonPool();
        long result = pool.invoke(
            new RecursiveArraySum(array, 0, array.length)
        );
        
        System.out.println("Sum: " + result);
    }
}

Алгоритм Work Stealing

Каждый поток имеет ДЕQUE (double-ended queue):

Thread1 DEQUE: [Task1][Task2][Task3][Task4] ← Новые задачи добавляются сюда
               
Thread2 DEQUE: [Task5]  ← Поток добавляет работу

Текущая работа:
Thread1: Выполняет Task1
Thread2: Выполняет Task5

Когда Thread1 добавляет новые задачи:
1. fork() добавляет задачу в конец DEQUE
2. Thread1 продолжает вычисления (может добавить ещё)

Когда Thread2 заканчивает и у неё нет работы:
1. Ищет DEQUE других потоков (начиная с конца)
2. Если находит, "крадёт" задачу из конца DEQUE
3. Начинает выполнять украденную задачу

Это эффективнее, чем простая очередь, потому что:
- Меньше конфликтов при доступе
- Лучше локальность кэша
- Нет глобальной очереди блокировок

Практический сценарий

// Сценарий: обработка дерева файлов
public class FileProcessor extends RecursiveTask<Integer> {
    private File file;
    
    @Override
    protected Integer compute() {
        if (file.isFile()) {
            // Базовый случай: обработать файл
            return processFile(file);
        } else {
            // Рекурсивный случай: разбить на подзадачи
            int count = 0;
            File[] children = file.listFiles();
            
            List<FileProcessor> subtasks = new ArrayList<>();
            for (File child : children) {
                FileProcessor task = new FileProcessor(child);
                task.fork();  // Отправляем в очередь
                subtasks.add(task);
            }
            
            // Work stealing гарантирует, что все ядра будут заняты
            for (FileProcessor task : subtasks) {
                count += task.join();
            }
            return count;
        }
    }
}

Сравнение: обычный ThreadPool vs ForkJoinPool

// Обычный ExecutorService (без work stealing)
ExecutorService executor = Executors.newFixedThreadPool(4);
// Проблема: если одна задача занимает намного больше, чем другие,
// её поток будет загружен, а остальные будут ждать

// ForkJoinPool (с work stealing)
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
// Решение: свободные потоки могут "украсть" задачи
// и помочь перегруженным потокам

Когда использовать Work Stealing?

✅ ИСПОЛЬЗУЙ Work Stealing когда:
  - Задачи неравномерно распределены по времени
  - Используешь divide-and-conquer алгоритмы
  - Нужна работа на многоядерных процессорах
  - Есть рекурсивные подзадачи разного размера
  - Используешь parallel streams

❌ НЕ ИСПОЛЬЗУЙ Work Stealing когда:
  - Задачи одного размера и времени выполнения
  - Есть много синхронизации между задачами
  - Задачи очень быстрые (overhead больше пользы)
  - Задачи уже хорошо сбалансированы

Пример из реального мира

// Обработка больших данных
public class DataAnalyzer {
    public static long analyzeArray(int[] data) {
        return ForkJoinTask.invokeAll(
            new AnalysisTask(data, 0, data.length)
        ).parallelStream()
         .mapToLong(ForkJoinTask::join)
         .sum();
    }
}

// Обход дерева директорий
public class DirectoryWalker extends RecursiveTask<Integer> {
    private Path path;
    
    protected Integer compute() {
        int count = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
            List<DirectoryWalker> tasks = new ArrayList<>();
            
            for (Path entry : stream) {
                DirectoryWalker task = new DirectoryWalker(entry);
                task.fork();
                tasks.add(task);
            }
            
            for (DirectoryWalker task : tasks) {
                count += task.join();
            }
        }
        return count;
    }
}

Вывод

Work Stealing пригодится для:

  1. Параллельной обработки данных (parallel streams)
  2. Divide-and-conquer алгоритмов (ForkJoinPool)
  3. Работы на многоядерных системах
  4. Оптимизации использования CPU при неравномерной нагрузке

Это встроено в Java через ForkJoinPool и параллельные потоки, поэтому в большинстве случаев вы используете work stealing, даже не думая об этом.

Где может пригодиться Work Stealing? | PrepBro