← Назад к вопросам
Что делает метод parallelStream?
2.0 Middle🔥 151 комментариев
#Stream API и функциональное программирование#Многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Метод parallelStream: параллельная обработка данных
parallelStream() — это метод на коллекциях (List, Set) который возвращает параллельный поток (parallel stream) для обработки данных на нескольких потоках одновременно. Используется для ускорения обработки больших объёмов данных.
1. Основное использование
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Обычный stream - последовательный
List<Integer> sequential = numbers.stream()
.map(x -> x * 2)
.collect(Collectors.toList());
// Обработка: 1->2, 2->4, 3->6, ..., 10->20 (по очереди)
// Параллельный stream - на нескольких потоках
List<Integer> parallel = numbers.parallelStream()
.map(x -> x * 2)
.collect(Collectors.toList());
// Обработка: несколько потоков одновременно обрабатывают разные элементы
2. Как работает параллельный stream
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
System.out.println("Sequential:");
numbers.stream()
.peek(x -> System.out.println("Thread: " + Thread.currentThread().getName()))
.map(x -> x * 2)
.forEach(System.out::println);
// Output: все выполняется в main потоке
System.out.println("\nParallel:");
numbers.parallelStream()
.peek(x -> System.out.println("Thread: " + Thread.currentThread().getName()))
.map(x -> x * 2)
.forEach(System.out::println);
// Output: может быть ForkJoinPool-1, ForkJoinPool-2 и т.д.
}
}
3. ForkJoinPool - механизм параллелизма
паралlelStream использует ForkJoinPool для управления потоками:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
// Процесс:
// 1. Разделить (Fork) данные на подзадачи
// 2. Обработать на разных потоках
// 3. Объединить (Join) результаты
long sum = data.parallelStream()
.mapToLong(Integer::longValue)
.sum();
// ForkJoinPool разделит список, обработает параллельно, объединит
4. Когда параллельный stream эффективен
// ХОРОШО: большой объём данных
List<Integer> largeList = new ArrayList<>();
for (int i = 0; i < 1_000_000; i++) {
largeList.add(i);
}
long start = System.currentTimeMillis();
long resultParallel = largeList.parallelStream()
.filter(x -> x % 2 == 0)
.map(x -> x * x)
.sum();
long parallelTime = System.currentTimeMillis() - start;
start = System.currentTimeMillis();
long resultSequential = largeList.stream()
.filter(x -> x % 2 == 0)
.map(x -> x * x)
.sum();
long sequentialTime = System.currentTimeMillis() - start;
System.out.println("Parallel: " + parallelTime + "ms");
System.out.println("Sequential: " + sequentialTime + "ms");
// Для больших данных parallelStream может быть в 2-4 раза быстрее
5. Когда параллельный stream НЕ эффективен
// ПЛОХО: маленькое количество элементов
List<Integer> smallList = Arrays.asList(1, 2, 3, 4, 5);
long result = smallList.parallelStream()
.map(x -> x * 2)
.sum();
// Overhead создания потоков может быть больше чем экономия времени
// ПЛОХО: дорогая операция синхронизации
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<String> result = new ArrayList<>();
list.parallelStream()
.forEach(x -> {
result.add(String.valueOf(x)); // UNSAFE! Race condition
});
// Потоки будут бороться за доступ к ArrayList
6. Потокобезопасность и сбор результатов
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
// БЕЗОПАСНО: используй Collector
List<String> safe = numbers.parallelStream()
.map(x -> "Item: " + x)
.collect(Collectors.toList());
// Collector обрабатывает синхронизацию
// ОПАСНО: мутирование общего объекта
List<String> unsafe = new ArrayList<>();
numbers.parallelStream()
.forEach(x -> unsafe.add("Item: " + x));
// Race condition! Разные потоки пишут одновременно
// БЕЗОПАСНО: reduce для объединения результатов
String result = numbers.parallelStream()
.map(Object::toString)
.reduce("", (a, b) -> a + "," + b);
// reduce безопасен для параллельных потоков
7. Пример: обработка больших данных
public class LargeDataProcessing {
public static void main(String[] args) {
// Список из миллиона объектов
List<Person> people = generateMillionPeople();
// Последовательная обработка
long start = System.currentTimeMillis();
long adultCount = people.stream()
.filter(p -> p.getAge() >= 18)
.map(Person::getName)
.peek(name -> processName(name)) // дорогая операция
.count();
System.out.println("Sequential: " + (System.currentTimeMillis() - start) + "ms");
// Параллельная обработка
start = System.currentTimeMillis();
adultCount = people.parallelStream()
.filter(p -> p.getAge() >= 18)
.map(Person::getName)
.peek(name -> processName(name))
.count();
System.out.println("Parallel: " + (System.currentTimeMillis() - start) + "ms");
}
private static void processName(String name) {
try { Thread.sleep(1); } catch (InterruptedException e) {}
}
}
8. Управление потокообразом ForkJoinPool
// По умолчанию используется ForkJoinPool.commonPool()
// Его размер = количество процессоров - 1
// Можно указать своё количество потоков через system property
// java -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 MyApp
// Или создать свой ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(8); // 8 потоков
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
long result = customPool.invoke(
numbers.parallelStream()
.mapToLong(x -> x * x)
.sum()
);
9. Статные vs безопасные операции
// НЕБЕЗОПАСНО: stateful operation
List<Integer> list = new ArrayList<>();
numbers.parallelStream()
.map(x -> {
list.add(x); // Разные потоки пишут одновременно
return x * 2;
})
.collect(Collectors.toList());
// БЕЗОПАСНО: stateless operation
List<Integer> result = numbers.parallelStream()
.map(x -> x * 2) // Чистая функция, нет побочных эффектов
.collect(Collectors.toList());
10. Когда использовать parallelStream
ИСПОЛЬЗОВАТЬ:
- Обработка больших списков (> 10000 элементов)
- Каждая операция занимает значительное время
- Данные хранятся в хорошо распределяемых структурах (ArrayList, HashSet)
- Нет зависимостей между элементами
НЕ ИСПОЛЬЗОВАТЬ:
- Маленькие списки (< 1000 элементов)
- Быстрые операции (простая фильтрация, маппинг)
- Данные в LinkedList (плохая локальность кеша)
- Дорогие операции ввода-вывода (I/O)
- Необходимо упорядочивание результатов
Вывод
паралlelStream() обрабатывает данные параллельно на нескольких потоках через ForkJoinPool, что может значительно ускорить обработку больших объёмов данных. Однако это эффективно только при определённых условиях — большой объём данных, дорогие операции, отсутствие синхронизации. Для маленьких списков параллелизм добавляет только overhead.