← Назад к вопросам
Как реализуешь отправку и ожидание выполнения задач в ThreadPoolExecutor?
1.2 Junior🔥 71 комментариев
#Другое
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Как реализовать отправку и ожидание выполнения задач в ThreadPoolExecutor
ThreadPoolExecutor — это ключевой класс для многопоточного программирования в Java. Он позволяет эффективно управлять потоками и выполнять асинхронные задачи. Есть несколько стратегий отправки задач и ожидания их выполнения.
1. Базовое использование ThreadPoolExecutor
import java.util.concurrent.*;
public class TaskSubmissionExample {
public static void main(String[] args) {
// Создание пула потоков: 4 потока (core), максимум 8 потоков
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // corePoolSize
8, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // timeUnit
new LinkedBlockingQueue<>(100) // workQueue
);
// Отправка задачи
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("Task executed in " + Thread.currentThread().getName());
}
});
// Или с lambda (Java 8+)
executor.execute(() -> System.out.println("Task executed"));
// Завершение работы пула
executor.shutdown();
}
}
2. submit() для получения результата (Future)
Если нужно получить результат задачи:
public class FutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
// submit() с Callable возвращает Future<T>
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Task result";
});
// Получить результат (блокирует до завершения)
String result = future.get(); // Ждёт выполнения
System.out.println(result); // "Task result"
// Или с timeout
String resultWithTimeout = future.get(5, TimeUnit.SECONDS);
executor.shutdown();
}
}
VsFuture vs execute():
- execute() — для Runnable, без результата
- submit() — для Callable или Runnable, возвращает Future
3. Отправка множества задач и ожидание всех
Способ 1: invokeAll() — ждёт все задачи
public class InvokeAllExample {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
// Список задач
List<Callable<String>> tasks = Arrays.asList(
() -> { Thread.sleep(1000); return "Result 1"; },
() -> { Thread.sleep(500); return "Result 2"; },
() -> { Thread.sleep(1500); return "Result 3"; }
);
// invokeAll() ждёт выполнения всех задач
List<Future<String>> futures = executor.invokeAll(tasks);
// Получить результаты
for (Future<String> future : futures) {
String result = future.get(); // Уже выполнена
System.out.println(result);
}
// С timeout
List<Future<String>> futuresWithTimeout = executor.invokeAll(
tasks,
3, // timeout
TimeUnit.SECONDS
);
executor.shutdown();
}
}
Способ 2: invokeAny() — ждёт первую завершённую
public class InvokeAnyExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
List<Callable<String>> tasks = Arrays.asList(
() -> { Thread.sleep(3000); return "Result 1"; },
() -> { Thread.sleep(500); return "Result 2"; }, // Эта выполнится первой
() -> { Thread.sleep(1000); return "Result 3"; }
);
// invokeAny() вернёт результат первой завершённой задачи
String fastestResult = executor.invokeAny(tasks); // "Result 2"
System.out.println(fastestResult);
executor.shutdown();
}
}
4. shutdown() и awaitTermination()
Правильное завершение пула потоков:
public class GracefulShutdownExample {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
// Отправка задач
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
try {
Thread.sleep(2000);
System.out.println("Task completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Завершение
executor.shutdown(); // Не принимает новые задачи, ждёт старые
// Ждём завершения (максимум 10 секунд)
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
// Если не завершилось за 10 секунд
List<Runnable> unfinished = executor.shutdownNow();
System.out.println("Forced shutdown. Unfinished tasks: " + unfinished.size());
}
System.out.println("All tasks completed");
}
}
Сравнение:
- shutdown() — перестаёт принимать новые задачи, ждёт текущие
- shutdownNow() — прерывает все задачи и вернёт невыполненные
- awaitTermination() — ждёт завершения с timeout
5. CountDownLatch для ожидания группы задач
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int numTasks = 5;
CountDownLatch latch = new CountDownLatch(numTasks);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
for (int i = 0; i < numTasks; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("Task " + taskId + " started");
Thread.sleep(1000);
System.out.println("Task " + taskId + " completed");
} finally {
latch.countDown(); // Уменьшает счётчик на 1
}
});
}
// Ждём, пока все задачи не завершатся (счётчик не станет 0)
latch.await();
System.out.println("All tasks completed!");
executor.shutdown();
}
}
6. CyclicBarrier для синхронизации потоков
public class CyclicBarrierExample {
public static void main(String[] args) {
int numThreads = 4;
CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
System.out.println("All threads reached the barrier!");
});
ThreadPoolExecutor executor = new ThreadPoolExecutor(
numThreads, numThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
executor.submit(() -> {
try {
System.out.println("Thread " + threadId + " starting work");
Thread.sleep(1000 + new Random().nextInt(2000));
System.out.println("Thread " + threadId + " waiting at barrier");
barrier.await(); // Ждёт остальные потоки
System.out.println("Thread " + threadId + " continuing");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
7. CompletableFuture для асинхронной композиции
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
// Единая асинхронная задача
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return "First result";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executor);
// Цепочка операций
CompletableFuture<String> chain = future
.thenApply(result -> result.toUpperCase())
.thenApply(result -> result + " - processed")
.thenApplyAsync(result -> {
// Выполняется в другом потоке
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return result + " - after delay";
}, executor);
// Ожидание результата
String finalResult = chain.get();
System.out.println(finalResult);
executor.shutdown();
}
}
8. Правильная обработка исключений
public class ExceptionHandlingExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
Future<Integer> future = executor.submit(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Task failed!");
}
return 42;
});
try {
Integer result = future.get();
System.out.println("Result: " + result);
} catch (ExecutionException e) {
System.out.println("Task failed with error: " + e.getCause().getMessage());
}
executor.shutdown();
}
}
9. Практический пример: обработка множества HTTP запросов
public class HttpTaskProcessing {
private final ThreadPoolExecutor executor;
public HttpTaskProcessing() {
this.executor = new ThreadPoolExecutor(
10,
20,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
}
public List<String> fetchUrlsInParallel(List<String> urls)
throws InterruptedException {
List<Callable<String>> tasks = urls.stream()
.map(url -> (Callable<String>) () -> fetchUrl(url))
.collect(Collectors.toList());
List<Future<String>> futures = executor.invokeAll(tasks, 30, TimeUnit.SECONDS);
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
try {
results.add(future.get());
} catch (ExecutionException e) {
results.add("ERROR: " + e.getCause().getMessage());
}
}
return results;
}
private String fetchUrl(String url) {
// Имитация HTTP запроса
try {
Thread.sleep(1000);
return "Content of " + url;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "INTERRUPTED";
}
}
public void shutdown() throws InterruptedException {
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
}
}
10. Лучшие практики
public class BestPractices {
// Используй Executors factory методы
ExecutorService executor1 = Executors.newFixedThreadPool(10);
ExecutorService executor2 = Executors.newCachedThreadPool();
ExecutorService executor3 = Executors.newSingleThreadExecutor();
ExecutorService executor4 = Executors.newScheduledThreadPool(5);
// Всегда завершай пул
void cleanup() throws InterruptedException {
executor1.shutdown();
if (!executor1.awaitTermination(30, TimeUnit.SECONDS)) {
executor1.shutdownNow();
}
}
// Используй try-with-resources где возможно (Java 7+)
void example() throws Exception {
try (ExecutorService executor = Executors.newFixedThreadPool(4)) {
executor.submit(() -> System.out.println("Task"));
} // Автоматически вызывает shutdown()
}
}
Сравнение методов ожидания
| Метод | Использование | Примечание |
|---|---|---|
| Future.get() | Ожидание одной задачи | Блокирует |
| invokeAll() | Ожидание всех задач | Возвращает все Future |
| invokeAny() | Ожидание первой | Возвращает первый результат |
| CountDownLatch | Синхронизация событий | Одноразовое использование |
| CyclicBarrier | Синхронизация потоков | Переиспользуемый barrier |
| CompletableFuture | Асинхронная композиция | Цепочка операций |
| shutdown()/awaitTermination() | Завершение пула | Graceful shutdown |
Заключение
Выбор метода зависит от задачи:
- Одна задача → Future.get()
- Группа задач → invokeAll() или CountDownLatch
- Первая готовая → invokeAny()
- Асинхронные цепочки → CompletableFuture
- Завершение приложения → shutdown() + awaitTermination()
Всегда предусмотрите правильное завершение пула потоков!