Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Пакет concurrent в Java
Пакет java.util.concurrent — это одна из наиболее важных библиотек Java для многопоточной разработки. Введена в Java 5, она содержит потокобезопасные структуры данных и утилиты для работы с потоками.
История и назначение
До Java 5 разработчики использовали только синхронизацию через synchronized, что было уязвимо для deadlocks и race conditions. Пакет concurrent предоставляет более безопасные и эффективные инструменты.
Основные компоненты
1. Executor Framework
Утилиты для управления потоками более высокого уровня, чем직접 работа с Thread.
ExecutorService — главный интерфейс:
// Создание пула потоков
ExecutorService executor = Executors.newFixedThreadPool(4);
// Отправка задачи
executor.execute(() -> System.out.println("Task 1"));
// Отправка задачи с результатом
Future<Integer> future = executor.submit(() -> {
Thread.sleep(2000);
return 42;
});
// Получение результата (блокирует до готовности)
try {
int result = future.get(); // Ждёт 2 секунды
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Завершить executor
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
Типы Executor'ов:
// Fixed thread pool — фиксированное количество потоков
ExecutorService fixed = Executors.newFixedThreadPool(4);
// Cached thread pool — динамическое количество потоков
ExecutorService cached = Executors.newCachedThreadPool();
// Single thread executor — один поток
ExecutorService single = Executors.newSingleThreadExecutor();
// Fork/Join pool — для divide-and-conquer алгоритмов
ForkJoinPool forkJoin = Executors.newWorkStealingPool();
// Scheduled executor — для периодических задач
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
scheduled.scheduleAtFixedRate(
() -> System.out.println("Task"),
0, // Начальная задержка
5, // Период
TimeUnit.SECONDS
);
2. Concurrent Collections
Потокобезопасные структуры данных, которые не требуют внешней синхронизации.
ConcurrentHashMap:
// Вместо synchronized HashMap
Map<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1);
int value = map.get("key1");
// Атомарные операции без явной синхронизации
map.putIfAbsent("key2", 2);
map.replace("key1", 1, 10); // Заменить только если значение = 1
// Итерация безопасна (даже если другие потоки изменяют карту)
for (String key : map.keySet()) {
System.out.println(key);
}
Сравнение с synchronized:
// ❌ Плохо — весь HashMap заблокирован
Map<String, Integer> map1 = Collections.synchronizedMap(new HashMap<>());
// ✅ Хорошо — использует внутренние блокировки (buckets)
Map<String, Integer> map2 = new ConcurrentHashMap<>();
Другие concurrent структуры:
// CopyOnWriteArrayList — безопасна для чтения
List<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.add("item2");
for (String item : list) { // Безопасно читать, пока другие пишут
System.out.println(item);
}
// BlockingQueue — для producer-consumer pattern
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
queue.put(1); // Блокирует, если очередь полна
int item = queue.take(); // Блокирует, если очередь пуста
// ConcurrentSkipListMap — отсортированная карта
NavigableMap<Integer, String> skipList = new ConcurrentSkipListMap<>();
skipList.put(3, "three");
skipList.put(1, "one");
for (Integer key : skipList.keySet()) { // Итерирует в порядке
System.out.println(key);
}
3. Atomic Variables
Атомарные переменные для безопасной работы с простыми типами без синхронизации.
// ❌ Плохо — race condition
private int counter = 0;
public void increment() {
counter++; // NOT THREAD-SAFE!
}
// ✅ Хорошо — атомарная операция
private AtomicInteger counter = new AtomicInteger(0);
public void increment() {
counter.incrementAndGet(); // THREAD-SAFE
}
public int getCounter() {
return counter.get();
}
Основные атомарные классы:
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet(); // Увеличить и вернуть
counter.decrementAndGet(); // Уменьшить и вернуть
counter.addAndGet(5); // Добавить и вернуть
counter.getAndIncrement(); // Вернуть и увеличить
counter.compareAndSet(1, 2); // CAS операция
AtomicReference<String> ref = new AtomicReference<>("initial");
ref.set("new value");
String value = ref.get();
AtomicLong longValue = new AtomicLong(0);
long result = longValue.incrementAndGet();
AtomicBoolean flag = new AtomicBoolean(false);
flag.set(true);
boolean isSet = flag.get();
4. Synchronizers
Утилиты для синхронизации потоков в сложных сценариях.
CountDownLatch — ждёт, пока N операций завершится:
CountDownLatch latch = new CountDownLatch(3);
// Запустить 3 потока
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println("Task starting");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task done");
latch.countDown(); // Уменьшить счётчик
}).start();
}
// Ждать, пока все потоки завершатся
latch.await();
System.out.println("All tasks completed");
CyclicBarrier — ждёт, пока все потоки достигнут точки:
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached barrier");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println("Thread waiting at barrier");
try {
barrier.await(); // Ждать других потоков
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Thread continuing");
}).start();
}
Semaphore — контролирует доступ к ресурсам:
// Только 2 потока могут одновременно входить
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // Взять разрешение
System.out.println("Thread entered");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // Отпустить разрешение
System.out.println("Thread exited");
}
}).start();
}
ReadWriteLock — оптимизирует доступ для операций чтения:
ReadWriteLock rwLock = new ReentrantReadWriteLock();
int sharedData = 0;
// Много потоков могут одновременно читать
rwLock.readLock().lock();
try {
int value = sharedData;
System.out.println(value);
} finally {
rwLock.readLock().unlock();
}
// Только один поток может писать
rwLock.writeLock().lock();
try {
sharedData = 10;
} finally {
rwLock.writeLock().unlock();
}
5. Future и Callable
Для асинхронного выполнения с результатами.
ExecutorService executor = Executors.newFixedThreadPool(2);
// Callable возвращает результат
Callable<String> task = () -> {
Thread.sleep(2000);
return "Task completed";
};
Future<String> future = executor.submit(task);
// Проверить статус
if (!future.isDone()) {
System.out.println("Task still running");
}
// Получить результат (блокирует)
try {
String result = future.get(5, TimeUnit.SECONDS); // С таймаутом
System.out.println(result);
} catch (TimeoutException e) {
System.out.println("Task took too long");
future.cancel(true); // Отменить задачу
}
executor.shutdown();
6. CompletableFuture
Модернизированная версия Future (Java 8+):
// Создать асинхронную задачу
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
return "Result";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// Цепочка операций (не блокирует!)
future
.thenApply(result -> result.toUpperCase())
.thenApply(result -> result + "!")
.thenAccept(result -> System.out.println("Final: " + result))
.exceptionally(ex -> {
System.err.println("Error: " + ex.getMessage());
return null;
});
// Ждать завершения
Thread.sleep(5000);
Практический пример: Producer-Consumer
public class ProducerConsumer {
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
public void start() {
// Producer поток
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// Consumer потоки
for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Integer item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
}
Вывод
Пакет java.util.concurrent — это фундамент многопоточной разработки в Java. Вместо низкоуровневой синхронизации synchronized, нужно использовать:
- ExecutorService — для управления потоками
- ConcurrentHashMap — для параллельного доступа к данным
- AtomicInteger — для безопасных операций с примитивами
- BlockingQueue — для потокобезопасной очереди
- Synchronizers — для координации потоков
Эти инструменты делают многопоточный код проще, безопаснее и производительнее.