← Назад к вопросам

Что такое пакет concurrent в Java?

2.0 Middle🔥 141 комментариев
#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Пакет concurrent в Java

Пакет java.util.concurrent — это одна из наиболее важных библиотек Java для многопоточной разработки. Введена в Java 5, она содержит потокобезопасные структуры данных и утилиты для работы с потоками.

История и назначение

До Java 5 разработчики использовали только синхронизацию через synchronized, что было уязвимо для deadlocks и race conditions. Пакет concurrent предоставляет более безопасные и эффективные инструменты.

Основные компоненты

1. Executor Framework

Утилиты для управления потоками более высокого уровня, чем직접 работа с Thread.

ExecutorService — главный интерфейс:

// Создание пула потоков
ExecutorService executor = Executors.newFixedThreadPool(4);

// Отправка задачи
executor.execute(() -> System.out.println("Task 1"));

// Отправка задачи с результатом
Future<Integer> future = executor.submit(() -> {
    Thread.sleep(2000);
    return 42;
});

// Получение результата (блокирует до готовности)
try {
    int result = future.get();  // Ждёт 2 секунды
    System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

// Завершить executor
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);

Типы Executor'ов:

// Fixed thread pool — фиксированное количество потоков
ExecutorService fixed = Executors.newFixedThreadPool(4);

// Cached thread pool — динамическое количество потоков
ExecutorService cached = Executors.newCachedThreadPool();

// Single thread executor — один поток
ExecutorService single = Executors.newSingleThreadExecutor();

// Fork/Join pool — для divide-and-conquer алгоритмов
ForkJoinPool forkJoin = Executors.newWorkStealingPool();

// Scheduled executor — для периодических задач
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
scheduled.scheduleAtFixedRate(
    () -> System.out.println("Task"),
    0,      // Начальная задержка
    5,      // Период
    TimeUnit.SECONDS
);

2. Concurrent Collections

Потокобезопасные структуры данных, которые не требуют внешней синхронизации.

ConcurrentHashMap:

// Вместо synchronized HashMap
Map<String, Integer> map = new ConcurrentHashMap<>();

map.put("key1", 1);
int value = map.get("key1");

// Атомарные операции без явной синхронизации
map.putIfAbsent("key2", 2);
map.replace("key1", 1, 10);  // Заменить только если значение = 1

// Итерация безопасна (даже если другие потоки изменяют карту)
for (String key : map.keySet()) {
    System.out.println(key);
}

Сравнение с synchronized:

// ❌ Плохо — весь HashMap заблокирован
Map<String, Integer> map1 = Collections.synchronizedMap(new HashMap<>());

// ✅ Хорошо — использует внутренние блокировки (buckets)
Map<String, Integer> map2 = new ConcurrentHashMap<>();

Другие concurrent структуры:

// CopyOnWriteArrayList — безопасна для чтения
List<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.add("item2");
for (String item : list) {  // Безопасно читать, пока другие пишут
    System.out.println(item);
}

// BlockingQueue — для producer-consumer pattern
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
queue.put(1);  // Блокирует, если очередь полна
int item = queue.take();  // Блокирует, если очередь пуста

// ConcurrentSkipListMap — отсортированная карта
NavigableMap<Integer, String> skipList = new ConcurrentSkipListMap<>();
skipList.put(3, "three");
skipList.put(1, "one");
for (Integer key : skipList.keySet()) {  // Итерирует в порядке
    System.out.println(key);
}

3. Atomic Variables

Атомарные переменные для безопасной работы с простыми типами без синхронизации.

// ❌ Плохо — race condition
private int counter = 0;

public void increment() {
    counter++;  // NOT THREAD-SAFE!
}

// ✅ Хорошо — атомарная операция
private AtomicInteger counter = new AtomicInteger(0);

public void increment() {
    counter.incrementAndGet();  // THREAD-SAFE
}

public int getCounter() {
    return counter.get();
}

Основные атомарные классы:

AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet();      // Увеличить и вернуть
counter.decrementAndGet();      // Уменьшить и вернуть
counter.addAndGet(5);           // Добавить и вернуть
counter.getAndIncrement();      // Вернуть и увеличить
counter.compareAndSet(1, 2);    // CAS операция

AtomicReference<String> ref = new AtomicReference<>("initial");
ref.set("new value");
String value = ref.get();

AtomicLong longValue = new AtomicLong(0);
long result = longValue.incrementAndGet();

AtomicBoolean flag = new AtomicBoolean(false);
flag.set(true);
boolean isSet = flag.get();

4. Synchronizers

Утилиты для синхронизации потоков в сложных сценариях.

CountDownLatch — ждёт, пока N операций завершится:

CountDownLatch latch = new CountDownLatch(3);

// Запустить 3 потока
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        System.out.println("Task starting");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Task done");
        latch.countDown();  // Уменьшить счётчик
    }).start();
}

// Ждать, пока все потоки завершатся
latch.await();
System.out.println("All tasks completed");

CyclicBarrier — ждёт, пока все потоки достигнут точки:

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("All threads reached barrier");
});

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        System.out.println("Thread waiting at barrier");
        try {
            barrier.await();  // Ждать других потоков
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Thread continuing");
    }).start();
}

Semaphore — контролирует доступ к ресурсам:

// Только 2 потока могут одновременно входить
Semaphore semaphore = new Semaphore(2);

for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire();  // Взять разрешение
            System.out.println("Thread entered");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();  // Отпустить разрешение
            System.out.println("Thread exited");
        }
    }).start();
}

ReadWriteLock — оптимизирует доступ для операций чтения:

ReadWriteLock rwLock = new ReentrantReadWriteLock();
int sharedData = 0;

// Много потоков могут одновременно читать
rwLock.readLock().lock();
try {
    int value = sharedData;
    System.out.println(value);
} finally {
    rwLock.readLock().unlock();
}

// Только один поток может писать
rwLock.writeLock().lock();
try {
    sharedData = 10;
} finally {
    rwLock.writeLock().unlock();
}

5. Future и Callable

Для асинхронного выполнения с результатами.

ExecutorService executor = Executors.newFixedThreadPool(2);

// Callable возвращает результат
Callable<String> task = () -> {
    Thread.sleep(2000);
    return "Task completed";
};

Future<String> future = executor.submit(task);

// Проверить статус
if (!future.isDone()) {
    System.out.println("Task still running");
}

// Получить результат (блокирует)
try {
    String result = future.get(5, TimeUnit.SECONDS);  // С таймаутом
    System.out.println(result);
} catch (TimeoutException e) {
    System.out.println("Task took too long");
    future.cancel(true);  // Отменить задачу
}

executor.shutdown();

6. CompletableFuture

Модернизированная версия Future (Java 8+):

// Создать асинхронную задачу
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
        return "Result";
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
});

// Цепочка операций (не блокирует!)
future
    .thenApply(result -> result.toUpperCase())
    .thenApply(result -> result + "!")
    .thenAccept(result -> System.out.println("Final: " + result))
    .exceptionally(ex -> {
        System.err.println("Error: " + ex.getMessage());
        return null;
    });

// Ждать завершения
Thread.sleep(5000);

Практический пример: Producer-Consumer

public class ProducerConsumer {
    private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    
    public void start() {
        // Producer поток
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    queue.put(i);
                    System.out.println("Produced: " + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
        
        // Consumer потоки
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        Integer item = queue.take();
                        System.out.println("Consumed: " + item);
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }).start();
        }
    }
}

Вывод

Пакет java.util.concurrent — это фундамент многопоточной разработки в Java. Вместо низкоуровневой синхронизации synchronized, нужно использовать:

  • ExecutorService — для управления потоками
  • ConcurrentHashMap — для параллельного доступа к данным
  • AtomicInteger — для безопасных операций с примитивами
  • BlockingQueue — для потокобезопасной очереди
  • Synchronizers — для координации потоков

Эти инструменты делают многопоточный код проще, безопаснее и производительнее.

Что такое пакет concurrent в Java? | PrepBro