← Назад к вопросам

Как реализуешь отправку и ожидание выполнения задач в ThreadPoolExecutor?

1.2 Junior🔥 71 комментариев
#Другое

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Как реализовать отправку и ожидание выполнения задач в ThreadPoolExecutor

ThreadPoolExecutor — это ключевой класс для многопоточного программирования в Java. Он позволяет эффективно управлять потоками и выполнять асинхронные задачи. Есть несколько стратегий отправки задач и ожидания их выполнения.

1. Базовое использование ThreadPoolExecutor

import java.util.concurrent.*;

public class TaskSubmissionExample {
    public static void main(String[] args) {
        // Создание пула потоков: 4 потока (core), максимум 8 потоков
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4,                              // corePoolSize
            8,                              // maximumPoolSize
            60,                             // keepAliveTime
            TimeUnit.SECONDS,               // timeUnit
            new LinkedBlockingQueue<>(100)  // workQueue
        );
        
        // Отправка задачи
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("Task executed in " + Thread.currentThread().getName());
            }
        });
        
        // Или с lambda (Java 8+)
        executor.execute(() -> System.out.println("Task executed"));
        
        // Завершение работы пула
        executor.shutdown();
    }
}

2. submit() для получения результата (Future)

Если нужно получить результат задачи:

public class FutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100)
        );
        
        // submit() с Callable возвращает Future<T>
        Future<String> future = executor.submit(() -> {
            Thread.sleep(1000);
            return "Task result";
        });
        
        // Получить результат (блокирует до завершения)
        String result = future.get(); // Ждёт выполнения
        System.out.println(result); // "Task result"
        
        // Или с timeout
        String resultWithTimeout = future.get(5, TimeUnit.SECONDS);
        
        executor.shutdown();
    }
}

VsFuture vs execute():

  • execute() — для Runnable, без результата
  • submit() — для Callable или Runnable, возвращает Future

3. Отправка множества задач и ожидание всех

Способ 1: invokeAll() — ждёт все задачи

public class InvokeAllExample {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );
        
        // Список задач
        List<Callable<String>> tasks = Arrays.asList(
            () -> { Thread.sleep(1000); return "Result 1"; },
            () -> { Thread.sleep(500); return "Result 2"; },
            () -> { Thread.sleep(1500); return "Result 3"; }
        );
        
        // invokeAll() ждёт выполнения всех задач
        List<Future<String>> futures = executor.invokeAll(tasks);
        
        // Получить результаты
        for (Future<String> future : futures) {
            String result = future.get(); // Уже выполнена
            System.out.println(result);
        }
        
        // С timeout
        List<Future<String>> futuresWithTimeout = executor.invokeAll(
            tasks,
            3,              // timeout
            TimeUnit.SECONDS
        );
        
        executor.shutdown();
    }
}

Способ 2: invokeAny() — ждёт первую завершённую

public class InvokeAnyExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );
        
        List<Callable<String>> tasks = Arrays.asList(
            () -> { Thread.sleep(3000); return "Result 1"; },
            () -> { Thread.sleep(500); return "Result 2"; },  // Эта выполнится первой
            () -> { Thread.sleep(1000); return "Result 3"; }
        );
        
        // invokeAny() вернёт результат первой завершённой задачи
        String fastestResult = executor.invokeAny(tasks); // "Result 2"
        System.out.println(fastestResult);
        
        executor.shutdown();
    }
}

4. shutdown() и awaitTermination()

Правильное завершение пула потоков:

public class GracefulShutdownExample {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );
        
        // Отправка задач
        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                try {
                    Thread.sleep(2000);
                    System.out.println("Task completed");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // Завершение
        executor.shutdown(); // Не принимает новые задачи, ждёт старые
        
        // Ждём завершения (максимум 10 секунд)
        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
            // Если не завершилось за 10 секунд
            List<Runnable> unfinished = executor.shutdownNow();
            System.out.println("Forced shutdown. Unfinished tasks: " + unfinished.size());
        }
        
        System.out.println("All tasks completed");
    }
}

Сравнение:

  • shutdown() — перестаёт принимать новые задачи, ждёт текущие
  • shutdownNow() — прерывает все задачи и вернёт невыполненные
  • awaitTermination() — ждёт завершения с timeout

5. CountDownLatch для ожидания группы задач

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int numTasks = 5;
        CountDownLatch latch = new CountDownLatch(numTasks);
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );
        
        for (int i = 0; i < numTasks; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("Task " + taskId + " started");
                    Thread.sleep(1000);
                    System.out.println("Task " + taskId + " completed");
                } finally {
                    latch.countDown(); // Уменьшает счётчик на 1
                }
            });
        }
        
        // Ждём, пока все задачи не завершатся (счётчик не станет 0)
        latch.await();
        System.out.println("All tasks completed!");
        
        executor.shutdown();
    }
}

6. CyclicBarrier для синхронизации потоков

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int numThreads = 4;
        CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
            System.out.println("All threads reached the barrier!");
        });
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            numThreads, numThreads, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );
        
        for (int i = 0; i < numThreads; i++) {
            final int threadId = i;
            executor.submit(() -> {
                try {
                    System.out.println("Thread " + threadId + " starting work");
                    Thread.sleep(1000 + new Random().nextInt(2000));
                    System.out.println("Thread " + threadId + " waiting at barrier");
                    barrier.await(); // Ждёт остальные потоки
                    System.out.println("Thread " + threadId + " continuing");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
}

7. CompletableFuture для асинхронной композиции

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );
        
        // Единая асинхронная задача
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                return "First result";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executor);
        
        // Цепочка операций
        CompletableFuture<String> chain = future
            .thenApply(result -> result.toUpperCase())
            .thenApply(result -> result + " - processed")
            .thenApplyAsync(result -> {
                // Выполняется в другом потоке
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return result + " - after delay";
            }, executor);
        
        // Ожидание результата
        String finalResult = chain.get();
        System.out.println(finalResult);
        
        executor.shutdown();
    }
}

8. Правильная обработка исключений

public class ExceptionHandlingExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
        );
        
        Future<Integer> future = executor.submit(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Task failed!");
            }
            return 42;
        });
        
        try {
            Integer result = future.get();
            System.out.println("Result: " + result);
        } catch (ExecutionException e) {
            System.out.println("Task failed with error: " + e.getCause().getMessage());
        }
        
        executor.shutdown();
    }
}

9. Практический пример: обработка множества HTTP запросов

public class HttpTaskProcessing {
    private final ThreadPoolExecutor executor;
    
    public HttpTaskProcessing() {
        this.executor = new ThreadPoolExecutor(
            10,
            20,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100)
        );
    }
    
    public List<String> fetchUrlsInParallel(List<String> urls) 
            throws InterruptedException {
        List<Callable<String>> tasks = urls.stream()
            .map(url -> (Callable<String>) () -> fetchUrl(url))
            .collect(Collectors.toList());
        
        List<Future<String>> futures = executor.invokeAll(tasks, 30, TimeUnit.SECONDS);
        
        List<String> results = new ArrayList<>();
        for (Future<String> future : futures) {
            try {
                results.add(future.get());
            } catch (ExecutionException e) {
                results.add("ERROR: " + e.getCause().getMessage());
            }
        }
        
        return results;
    }
    
    private String fetchUrl(String url) {
        // Имитация HTTP запроса
        try {
            Thread.sleep(1000);
            return "Content of " + url;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "INTERRUPTED";
        }
    }
    
    public void shutdown() throws InterruptedException {
        executor.shutdown();
        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
    }
}

10. Лучшие практики

public class BestPractices {
    // Используй Executors factory методы
    ExecutorService executor1 = Executors.newFixedThreadPool(10);
    ExecutorService executor2 = Executors.newCachedThreadPool();
    ExecutorService executor3 = Executors.newSingleThreadExecutor();
    ExecutorService executor4 = Executors.newScheduledThreadPool(5);
    
    // Всегда завершай пул
    void cleanup() throws InterruptedException {
        executor1.shutdown();
        if (!executor1.awaitTermination(30, TimeUnit.SECONDS)) {
            executor1.shutdownNow();
        }
    }
    
    // Используй try-with-resources где возможно (Java 7+)
    void example() throws Exception {
        try (ExecutorService executor = Executors.newFixedThreadPool(4)) {
            executor.submit(() -> System.out.println("Task"));
        } // Автоматически вызывает shutdown()
    }
}

Сравнение методов ожидания

МетодИспользованиеПримечание
Future.get()Ожидание одной задачиБлокирует
invokeAll()Ожидание всех задачВозвращает все Future
invokeAny()Ожидание первойВозвращает первый результат
CountDownLatchСинхронизация событийОдноразовое использование
CyclicBarrierСинхронизация потоковПереиспользуемый barrier
CompletableFutureАсинхронная композицияЦепочка операций
shutdown()/awaitTermination()Завершение пулаGraceful shutdown

Заключение

Выбор метода зависит от задачи:

  • Одна задача → Future.get()
  • Группа задач → invokeAll() или CountDownLatch
  • Первая готовая → invokeAny()
  • Асинхронные цепочки → CompletableFuture
  • Завершение приложения → shutdown() + awaitTermination()

Всегда предусмотрите правильное завершение пула потоков!