Какие задачи решал с помощью многопоточности
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Задачи решённые с помощью многопоточности
В процессе своей работы я использовал многопоточность для решения различных задач в Java приложениях. Расскажу о конкретных сценариях и реализациях.
1. Обработка большого количества HTTP запросов
Задача: WebSocket сервер должен одновременно обслуживать тысячи клиентов
public class WebSocketServer {
private final ExecutorService executorService =
Executors.newFixedThreadPool(100);
private final ServerSocket serverSocket;
public WebSocketServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
}
public void start() {
while (true) {
try {
Socket clientSocket = serverSocket.accept();
// Каждый клиент обслуживается отдельным потоком
executorService.submit(new ClientHandler(clientSocket));
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static class ClientHandler implements Runnable {
private final Socket socket;
public ClientHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream()))) {
String message;
while ((message = reader.readLine()) != null) {
// Обработка сообщения от клиента
processMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void processMessage(String message) {
// Долгая операция (IO, БД)
System.out.println("Обработка: " + message);
}
}
}
2. Асинхронная обработка очереди задач (Job Processing)
Задача: Обработка фоновых задач (рассылка писем, генерация отчётов) без блокирования основного потока
@Service
public class EmailQueueProcessor {
private final ExecutorService executorService =
Executors.newFixedThreadPool(5);
private final EmailService emailService;
private final EmailQueue emailQueue;
@PostConstruct
public void startProcessing() {
// Запускаем обработчик очереди в отдельном потоке
executorService.submit(new QueueProcessor());
}
private class QueueProcessor implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
EmailTask task = emailQueue.take(); // Блокирующий вызов
emailService.sendEmail(task.getEmail(), task.getSubject());
emailQueue.markAsProcessed(task.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
3. Параллельная обработка большого набора данных
Задача: Обработать 1 миллион записей из файла быстрее чем за секунду
public class DataProcessor {
public void processLargeDataset(String filename) {
ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Future<ProcessingResult>> futures = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
List<String> batch = new ArrayList<>();
String line;
while ((line = reader.readLine()) != null) {
batch.add(line);
// Обрабатываем батчами по 10000 записей в отдельных потоках
if (batch.size() == 10000) {
List<String> currentBatch = new ArrayList<>(batch);
futures.add(executorService.submit(() ->
processBatch(currentBatch)
));
batch.clear();
}
}
// Обработка последнего батча
if (!batch.isEmpty()) {
futures.add(executorService.submit(() -> processBatch(batch)));
}
// Ждём завершения всех задач
for (Future<ProcessingResult> future : futures) {
ProcessingResult result = future.get(); // Блокирующий вызов
System.out.println("Обработано " + result.getCount() + " записей");
}
} finally {
executorService.shutdown();
}
}
private ProcessingResult processBatch(List<String> batch) {
int processedCount = 0;
for (String line : batch) {
// Обработка каждой строки
transformData(line);
processedCount++;
}
return new ProcessingResult(processedCount);
}
}
4. Асинхронная передача данных между микросервисами
Задача: Одновременно отправлять данные на несколько сервисов без блокирования
@Service
public class DataReplicator {
private final RestTemplate restTemplate;
private final ExecutorService executorService;
public DataReplicator() {
this.executorService = Executors.newFixedThreadPool(10);
}
public void replicateDataToMultipleServices(Data data,
List<String> serviceUrls) {
List<CompletableFuture<Void>> futures = serviceUrls.stream()
.map(url -> CompletableFuture.runAsync(
() -> sendDataToService(url, data),
executorService
))
.collect(Collectors.toList());
// Ждём завершения всех отправок
CompletableFuture.allOf(futures.toArray(
new CompletableFuture[0]
)).join();
}
private void sendDataToService(String url, Data data) {
try {
restTemplate.postForObject(
url + "/data",
data,
String.class
);
System.out.println("Отправлено на " + url);
} catch (Exception e) {
System.err.println("Ошибка отправки на " + url + ": " + e.getMessage());
}
}
}
5. Планирование повторяющихся задач
Задача: Запускать healthcheck каждые 30 секунд и обновлять кэш каждый час
@Service
public class ScheduledTaskService {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(2);
private final CacheService cacheService;
private final HealthCheckService healthCheck;
@PostConstruct
public void initScheduledTasks() {
// Healthcheck каждые 30 секунд
scheduler.scheduleAtFixedRate(
this::performHealthCheck,
0, // начальная задержка
30, // интервал
TimeUnit.SECONDS
);
// Обновление кэша каждый час
scheduler.scheduleAtFixedRate(
this::refreshCache,
0,
1,
TimeUnit.HOURS
);
}
private void performHealthCheck() {
System.out.println("[" + Thread.currentThread().getName() + "] Healthcheck...");
if (!healthCheck.isServiceHealthy()) {
System.err.println("Сервис нездоров!");
restartService();
}
}
private void refreshCache() {
System.out.println("[" + Thread.currentThread().getName() + "] Обновление кэша...");
cacheService.refresh();
}
private void restartService() {
// Логика перезагрузки
}
}
6. Обработка real-time данных с множественными источниками
Задача: Получать данные с 3 датчиков одновременно, агрегировать и анализировать
public class SensorDataAggregator {
private final ExecutorService executorService =
Executors.newFixedThreadPool(3);
public SensorReadings collectSensorData(List<Sensor> sensors) {
List<Future<SensorReading>> futures = sensors.stream()
.map(sensor -> executorService.submit(() ->
sensor.readData() // Блокирующий I/O
))
.collect(Collectors.toList());
SensorReadings readings = new SensorReadings();
int index = 0;
for (Future<SensorReading> future : futures) {
try {
SensorReading reading = future.get(5, TimeUnit.SECONDS);
readings.add(reading);
} catch (TimeoutException e) {
System.err.println("Датчик " + index + " не ответил вовремя");
futures.get(index).cancel(true);
} catch (Exception e) {
System.err.println("Ошибка чтения датчика: " + e.getMessage());
}
index++;
}
return readings;
}
}
7. Producer-Consumer паттерн
Задача: Одни потоки генерируют данные, другие обрабатывают
public class ProducerConsumerExample {
private final BlockingQueue<Data> queue = new LinkedBlockingQueue<>(100);
private final ExecutorService executorService =
Executors.newFixedThreadPool(5);
public void start() {
// 2 producer потока
executorService.submit(new Producer("Producer-1"));
executorService.submit(new Producer("Producer-2"));
// 3 consumer потока
executorService.submit(new Consumer("Consumer-1"));
executorService.submit(new Consumer("Consumer-2"));
executorService.submit(new Consumer("Consumer-3"));
}
private class Producer implements Runnable {
private final String name;
public Producer(String name) {
this.name = name;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Data data = generateData(i);
queue.put(data); // Блокирует, если очередь полна
System.out.println(name + " произвёл " + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private class Consumer implements Runnable {
private final String name;
public Consumer(String name) {
this.name = name;
}
@Override
public void run() {
try {
while (true) {
Data data = queue.take(); // Ждёт, если очередь пуста
processData(data);
System.out.println(name + " обработал данные");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
8. Параллельный парсинг больших XML/JSON файлов
Задача: Парсить большой JSON файл (2GB), разделив на части
public class LargeJsonParser {
private final ExecutorService executorService =
Executors.newFixedThreadPool(4);
public void parseJsonInParallel(String filename) {
try (BufferedReader reader = new BufferedReader(
new FileReader(filename), 65536)) {
List<Future<JsonObject>> futures = new ArrayList<>();
StringBuilder jsonLine = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
jsonLine.append(line);
if (isCompleteJsonObject(jsonLine)) {
final String json = jsonLine.toString();
futures.add(executorService.submit(() ->
parseJsonObject(json)
));
jsonLine = new StringBuilder();
}
}
// Обработка результатов
for (Future<JsonObject> future : futures) {
JsonObject obj = future.get();
saveToDatabase(obj);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
9. Синхронизация доступа к общему ресурсу
Задача: Несколько потоков работают с общим счётчиком
public class AtomicCounterExample {
private final AtomicInteger counter = new AtomicInteger(0);
private final ExecutorService executorService =
Executors.newFixedThreadPool(10);
public void demonstrateThreadSafety() {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
for (int j = 0; j < 100; j++) {
counter.incrementAndGet(); // Потокобезопасное увеличение
}
});
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Итоговое значение счётчика: " + counter.get());
// Результат: 10000 (100 потоков * 100 операций)
}
}
Ключевые инструменты для многопоточности
✅ ExecutorService - управление пулом потоков ✅ Future - получение результатов из асинхронных задач ✅ BlockingQueue - безопасная передача данных между потоками ✅ ScheduledExecutorService - планирование повторяющихся задач ✅ CompletableFuture - построение асинхронных цепочек ✅ AtomicInteger/AtomicReference - безопасные операции без синхронизации ✅ ReadWriteLock - оптимизированная синхронизация для чтение-записи ✅ CountDownLatch - синхронизация нескольких потоков ✅ CyclicBarrier - ограничитель потоков в точке встречи
Многопоточность - это критический навык для создания масштабируемых и отзывчивых приложений на Java.