Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Многопоточность в Java: практический опыт
Многопоточность — это одно из самых сложных, но и самых важных мест в Java. Расскажу о том, с чем я работал.
1. Thread Pools (Executor Framework)
Вместо создания новых потоков для каждой задачи, используем пулы потоков.
// ❌ Плохо: создаём новый Thread для каждого запроса
@GetMapping("/process")
public void processRequest() {
new Thread(() -> {
heavyComputation();
}).start(); // Каждый запрос = новый поток
}
// ✅ Хорошо: используем ThreadPool
@Service
public class ProcessingService {
private final ExecutorService executor =
Executors.newFixedThreadPool(10);
public void processRequest() {
// Из пула 10 потоков выбираем свободный
executor.submit(() -> heavyComputation());
}
}
Где использую:
- Обработка асинхронных запросов
- Batch processing (например, экспорт данных)
- Параллельные вычисления
2. Spring @Async — удобная абстракция над ThreadPool
@Service
public class EmailService {
// Этот метод выполнится в отдельном потоке из пула
@Async
public void sendEmail(String to, String subject, String body) {
// Пример: отправка 1000 писем
// Основной поток не блокируется!
mailService.send(to, subject, body);
}
}
@RestController
public class OrderController {
@PostMapping("/orders")
public ResponseEntity<OrderDto> createOrder(
@RequestBody CreateOrderRequest request) {
// Основной поток отвечает быстро (< 100ms)
Order order = orderService.create(request);
// Email отправляется в фоне
emailService.sendEmail(
order.getCustomerEmail(),
"Order Confirmation",
buildEmailBody(order)
);
return ResponseEntity.ok(OrderMapper.toDto(order));
}
}
3. Synchronized и volatile
synchronized — для защиты критичных секций:
// Проблема: race condition
public class Counter {
private int count = 0;
public void increment() {
count++; // Может быть problem!
// Thread A: читает count=5
// Thread B: читает count=5
// A пишет count=6
// B пишет count=6 (потеряли 1 инкремент!)
}
}
// Решение: synchronized
public class SyncCounter {
private int count = 0;
public synchronized void increment() {
count++; // Теперь безопасно
}
}
// Лучше решение: AtomicInteger
public class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // Lock-free, безопаснее
}
}
volatile — видимость между потоками:
// Проблема: кеширование в регистре процессора
public class Flag {
private boolean running = true;
public void stop() {
running = false; // Thread B может не увидеть это!
}
public void loop() {
while (running) { // Бесконечный цикл вThread A
// Thread A кешировал running в регистре
}
}
}
// Решение: volatile
public class VolatileFlag {
private volatile boolean running = true;
public void stop() {
running = false; // Гарантированно видно всем потокам
}
public void loop() {
while (running) { // Каждый раз читаем из памяти
// Правильно работает
}
}
}
4. Locks (ReentrantLock, ReadWriteLock)
ReentrantLock — более мощный, чем synchronized:
// synchronized просто блокирует
public synchronized void criticalSection() {
// Нельзя контролировать, что делать при timeout
}
// ReentrantLock даёт больше контроля
public class BankAccount {
private final ReentrantLock lock = new ReentrantLock();
private BigDecimal balance;
public void withdraw(BigDecimal amount) throws InterruptedException {
// Пытаемся захватить lock с timeout
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
throw new TimeoutException("Could not acquire lock");
}
try {
if (balance.compareTo(amount) >= 0) {
balance = balance.subtract(amount);
} else {
throw new InsufficientFundsException();
}
} finally {
lock.unlock(); // ВАЖНО: всегда unlock
}
}
}
ReadWriteLock — для сценариев "много читеров, мало писателей":
public class UserCache {
private final Map<Long, User> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Много потоков могут читать одновременно
public User getUser(Long id) {
lock.readLock().lock();
try {
return cache.get(id);
} finally {
lock.readLock().unlock();
}
}
// Писатель блокирует всех
public void putUser(Long id, User user) {
lock.writeLock().lock();
try {
cache.put(id, user);
} finally {
lock.writeLock().unlock();
}
}
}
5. Concurrent Collections
Обычные Collections не thread-safe. Используем concurrent версии:
// ❌ Не thread-safe
private List<String> list = new ArrayList<>(); // Опасно!
// ✅ Thread-safe
private List<String> list = Collections.synchronizedList(new ArrayList<>());
// ✅ Лучше (lock-free)
private List<String> list = new CopyOnWriteArrayList<>();
// ✅ Для очередей (queue)
private Queue<String> queue = new ConcurrentLinkedQueue<>();
// ✅ Для hashmap
private Map<String, String> map = new ConcurrentHashMap<>();
6. CountDownLatch и CyclicBarrier
CountDownLatch — ждём, пока N потоков завершат работу:
public class DataProcessingService {
public void processDataInParallel(List<Data> data) throws InterruptedException {
int threadCount = 4;
CountDownLatch latch = new CountDownLatch(threadCount);
// Запускаем 4 потока
for (int i = 0; i < threadCount; i++) {
int threadId = i;
executor.submit(() -> {
try {
// Обрабатываем свою часть данных
processDataChunk(data, threadId, threadCount);
} finally {
latch.countDown(); // Уменьшаем счётчик
}
});
}
// Ждём, пока все потоки завершатся
latch.await(); // Блокируется, пока latch.count != 0
// Теперь можем обработать результаты
aggregateResults();
}
}
CyclicBarrier — все потоки ждут друг друга на барьере:
public class GameServer {
public void startMultiplayerGame(int playerCount) {
CyclicBarrier barrier = new CyclicBarrier(playerCount, () -> {
// Callback: все игроки готовы, начинаем игру
logger.info("Game started!");
});
for (int i = 0; i < playerCount; i++) {
int playerId = i;
executor.submit(() -> {
loadResources(playerId);
logger.info("Player {} ready", playerId);
try {
barrier.await(); // Ждём остальных
} catch (InterruptedException | BrokenBarrierException e) {
logger.error("Barrier broken", e);
}
playGame();
});
}
}
}
7. Semaphore — управление лимитом ресурсов
public class DatabaseConnectionPool {
// Максимум 10 одновременных соединений
private final Semaphore semaphore = new Semaphore(10);
public void executeQuery(String query) throws InterruptedException {
semaphore.acquire(); // Занимаем место
try {
// Максимум 10 потоков одновременно здесь
Connection conn = getConnection();
executeOnConnection(conn, query);
} finally {
semaphore.release(); // Освобождаем место для других
}
}
}
8. Future и CompletableFuture
@Service
public class OrderService {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
// ❌ Старый способ: Future
public OrderDto createOrderOld(CreateOrderRequest request) {
Future<OrderDto> future = executor.submit(() -> {
return createOrder(request);
});
try {
return future.get(5, TimeUnit.SECONDS); // Блокирует!
} catch (TimeoutException e) {
future.cancel(true);
throw new OrderProcessingException();
}
}
// ✅ Лучше: CompletableFuture (non-blocking)
public CompletableFuture<OrderDto> createOrderNew(
CreateOrderRequest request) {
return CompletableFuture
.supplyAsync(() -> createOrder(request), executor)
.thenApply(order -> {
// После создания заказа
sendConfirmationEmail(order);
return order;
})
.thenApply(order -> {
// После отправки письма
updateAnalytics(order);
return order;
})
.exceptionally(ex -> {
logger.error("Order creation failed", ex);
return null;
});
}
}
// Использование:
@PostMapping("/orders")
public ResponseEntity<CompletableFuture<OrderDto>> createOrder(
@RequestBody CreateOrderRequest request) {
return ResponseEntity.ok(
orderService.createOrderNew(request)
);
}
9. Virtual Threads (Java 21+) — будущее многопоточности
// Java 21+ имеет Virtual Threads (легче, дешевле)
public void processRequestsWithVirtualThreads() {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// Можем создать 1 000 000 virtual threads без проблем
// С обычными потоками это невозможно (память заканчивается)
for (int i = 0; i < 1_000_000; i++) {
executor.submit(() -> {
processRequest();
});
}
}
}
10. Race Condition Example
// Реальный пример: проблема с счётом денег
public class BankAccount {
private BigDecimal balance = new BigDecimal("1000.00");
public void transfer(BigDecimal amount) {
// Читаем
BigDecimal current = balance;
// Небольшая задержка (в реальном коде: сетевой запрос, БД и т.д.)
Thread.sleep(10);
// Пишем
balance = current.subtract(amount);
}
}
// Thread A: transfer(100) Thread B: transfer(100)
// A: читает 1000 B: читает 1000
// A: спит 10ms B: спит 10ms
// A: пишет 900 B: пишет 900 (потеряли 100!)
// Должна быть 800, а есть 900
// Решение: synchronized
public synchronized void transfer(BigDecimal amount) {
BigDecimal current = balance;
Thread.sleep(10);
balance = current.subtract(amount);
}
Инструменты для отладки многопоточности
# 1. Смотрим все потоки в приложении
jstack <pid>
# 2. ThreadDump в Java
System.out.println(Thread.getAllStackTraces());
# 3. JProfiler или YourKit для детального анализа
# 4. Visualvm встроенная утилита в JDK
Основные принципы
- Минимизируй synchronized блоки — они замедляют производительность
- Используй concurrent collections — они быстрее, чем synchronized List/Map
- Тестируй с нагрузкой — race conditions обнаруживаются только под нагрузкой
- Документируй thread-safety — для каждого класса уточни, потокобезопасен ли он
- Используй modern инструменты — CompletableFuture, Virtual Threads
Вывод: Многопоточность сложная, но необходима для производительности. Главное — писать код, который предусматривает одновременный доступ с самого начала.