Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
java.util.concurrent: Многопоточное программирование
java.util.concurrent — это пакет Java, который содержит классы и интерфейсы для написания многопоточных приложений. Он был введён в Java 5 для упрощения работы с потоками и предотвращения ошибок синхронизации.
История
До Java 5 разработчикам приходилось работать с низкоуровневыми примитивами синхронизации (synchronized, wait/notify). Это было сложно, подвержено ошибкам и потокобезопасно при небольших ошибках. Java 5 привнесла java.util.concurrent, созданный Doug Lea, который предоставляет высокоуровневые инструменты для многопоточности.
Основные компоненты
1. Executors (Исполнители)
Это фреймворк для управления потоками вместо прямого создания через new Thread().
// Старый способ — плохо
for (int i = 0; i < 100; i++) {
new Thread(() -> {
// выполнить задачу
}).start();
}
// Проблемы: утечка ресурсов, нет контроля, сложно масштабировать
// Новый способ с Executor — хорошо
Executor executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
// выполнить задачу
});
}
Типы ExecutorService:
// 1. FixedThreadPool — фиксированное количество потоков
ExecutorService executor = Executors.newFixedThreadPool(10);
// Хорошо для известной нагрузки, переиспользует потоки
// 2. CachedThreadPool — создаёт потоки по требованию
ExecutorService executor = Executors.newCachedThreadPool();
// Для коротких асинхронных задач, потоки переиспользуются
// 3. SingleThreadExecutor — один поток для выполнения задач
ExecutorService executor = Executors.newSingleThreadExecutor();
// Гарантирует последовательное выполнение задач
// 4. ScheduledExecutorService — выполнение с расписанием
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> System.out.println("Задача"), 10, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(() -> System.out.println("Повторяемая"), 0, 5, TimeUnit.SECONDS);
// 5. ForkJoinPool — для разделяй и властвуй алгоритмов
ForkJoinPool pool = ForkJoinPool.commonPool();
2. Future и Callable
Для получения результатов асинхронных операций.
// Callable — функция, которая возвращает результат
Callable<Integer> task = () -> {
Thread.sleep(2000);
return 42;
};
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(task);
// Получить результат (блокирует до завершения)
try {
Integer result = future.get(); // получить результат
System.out.println("Результат: " + result);
// Получить с timeout
Integer resultWithTimeout = future.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
// Проверить статус
if (!future.isDone()) {
System.out.println("Задача ещё выполняется");
}
// Отменить задачу
future.cancel(true); // true = прервать даже если выполняется
3. CountDownLatch
Для синхронизации: ждёт, пока счётчик не достигнет нуля.
// Сценарий: главный поток ждёт, пока 5 рабочих потоков завершат работу
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " начал работу");
Thread.sleep((long) (Math.random() * 5000));
System.out.println(Thread.currentThread().getName() + " закончил");
} finally {
latch.countDown(); // уменьшить счётчик на 1
}
});
}
// Главный поток ждёт
try {
System.out.println("Главный поток ждёт...");
latch.await(); // ждёт, пока счётчик не станет 0
System.out.println("Все потоки завершили работу!");
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
4. CyclicBarrier
Для синхронизации групп потоков: все потоки ждут друг друга в точке встречи.
// Сценарий: 3 потока должны встретиться в одной точке перед продолжением
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("Все потоки прошли через барьер!");
});
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
final int id = i;
executor.submit(() -> {
try {
System.out.println("Поток " + id + " ждёт других");
barrier.await(); // ждёт, пока все потоки доберутся сюда
System.out.println("Поток " + id + " продолжает работу");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
5. Semaphore
Для ограничения доступа к ресурсам.
// Сценарий: только 3 потока одновременно могут работать с БД
Semaphore semaphore = new Semaphore(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++) {
final int id = i;
executor.submit(() -> {
try {
System.out.println("Поток " + id + " ждёт разрешения");
semaphore.acquire(); // получить разрешение (блокирует если нет)
System.out.println("Поток " + id + " работает с БД");
Thread.sleep(2000);
System.out.println("Поток " + id + " завершил");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // освободить разрешение
}
});
}
executor.shutdown();
6. Concurrent Collections
Потокобезопасные коллекции вместо synchronized.
// Старый способ — всё блокируется
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
// Проблемы: вся таблица блокируется при каждой операции
// Новый способ — ConcurrentHashMap
ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
// Разделённая блокировка (segment locking) — только нужная часть блокируется
// Другие concurrent коллекции
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
// Примеры использования
concurrentMap.putIfAbsent("key", "value");
concurrentMap.replace("key", "oldValue", "newValue");
7. BlockingQueue
Для общения между потоками.
// Сценарий: producer-consumer паттерн
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// Producer
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
try {
for (int i = 0; i < 5; i++) {
System.out.println("Производитель добавляет: " + i);
queue.put("item-" + i); // добавить элемент (блокирует если очередь полна)
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
executor.submit(() -> {
try {
while (true) {
String item = queue.take(); // получить элемент (блокирует если очередь пуста)
System.out.println("Потребитель получил: " + item);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.shutdown();
8. Locks
Альтернатива synchronized для более гибкого управления.
// synchronized — просто и обычно достаточно
public synchronized void method() { }
// Lock — более гибкое управление
private final Lock lock = new ReentrantLock();
public void method() {
lock.lock();
try {
// защищённый код
} finally {
lock.unlock();
}
}
// ReadWriteLock — для много читателей, мало писателей
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void read() {
rwLock.readLock().lock();
try {
// чтение данных
} finally {
rwLock.readLock().unlock();
}
}
public void write() {
rwLock.writeLock().lock();
try {
// изменение данных
} finally {
rwLock.writeLock().unlock();
}
}
CompletableFuture (Java 8+)
Высокоуровневый API для асинхронного программирования.
// Простой асинхронный процесс
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
return "Результат";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// Обработка результата
future.thenAccept(result -> System.out.println("Результат: " + result));
// Цепочка операций
CompletableFuture<String> chain = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(str -> str + " World")
.thenApply(String::toUpperCase);
chain.thenAccept(System.out::println); // HELLO WORLD
// Комбинирование нескольких Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> combined = future1.thenCombine(future2, (a, b) -> a + b);
Best Practices
- Используйте Executor вместо прямого создания потоков
- Shutdown properly — всегда вызывайте executor.shutdown() или shutdownNow()
- Обрабатывайте исключения — Future может выбросить ExecutionException
- Выбирайте правильный пул — FixedThreadPool для ограниченных ресурсов
- Используйте concurrent коллекции вместо synchronized
- Избегайте deadlocks — не держите несколько блокировок одновременно
- Тестируйте многопоточность — она подвержена race conditions
Вывод
java.util.concurrent предоставляет мощные инструменты для безопасного многопоточного программирования. Вместо работы с низкоуровневыми примитивами синхронизации, разработчики могут использовать высокоуровневые абстракции, которые менее подвержены ошибкам и легче понимать.