← Назад к вопросам

Что делает метод parallelStream?

2.0 Middle🔥 151 комментариев
#Stream API и функциональное программирование#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Метод parallelStream: параллельная обработка данных

parallelStream() — это метод на коллекциях (List, Set) который возвращает параллельный поток (parallel stream) для обработки данных на нескольких потоках одновременно. Используется для ускорения обработки больших объёмов данных.

1. Основное использование

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// Обычный stream - последовательный
List<Integer> sequential = numbers.stream()
    .map(x -> x * 2)
    .collect(Collectors.toList());
// Обработка: 1->2, 2->4, 3->6, ..., 10->20 (по очереди)

// Параллельный stream - на нескольких потоках
List<Integer> parallel = numbers.parallelStream()
    .map(x -> x * 2)
    .collect(Collectors.toList());
// Обработка: несколько потоков одновременно обрабатывают разные элементы

2. Как работает параллельный stream

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        
        System.out.println("Sequential:");
        numbers.stream()
            .peek(x -> System.out.println("Thread: " + Thread.currentThread().getName()))
            .map(x -> x * 2)
            .forEach(System.out::println);
        // Output: все выполняется в main потоке
        
        System.out.println("\nParallel:");
        numbers.parallelStream()
            .peek(x -> System.out.println("Thread: " + Thread.currentThread().getName()))
            .map(x -> x * 2)
            .forEach(System.out::println);
        // Output: может быть ForkJoinPool-1, ForkJoinPool-2 и т.д.
    }
}

3. ForkJoinPool - механизм параллелизма

паралlelStream использует ForkJoinPool для управления потоками:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

// Процесс:
// 1. Разделить (Fork) данные на подзадачи
// 2. Обработать на разных потоках
// 3. Объединить (Join) результаты

long sum = data.parallelStream()
    .mapToLong(Integer::longValue)
    .sum();
// ForkJoinPool разделит список, обработает параллельно, объединит

4. Когда параллельный stream эффективен

// ХОРОШО: большой объём данных
List<Integer> largeList = new ArrayList<>();
for (int i = 0; i < 1_000_000; i++) {
    largeList.add(i);
}

long start = System.currentTimeMillis();
long resultParallel = largeList.parallelStream()
    .filter(x -> x % 2 == 0)
    .map(x -> x * x)
    .sum();
long parallelTime = System.currentTimeMillis() - start;

start = System.currentTimeMillis();
long resultSequential = largeList.stream()
    .filter(x -> x % 2 == 0)
    .map(x -> x * x)
    .sum();
long sequentialTime = System.currentTimeMillis() - start;

System.out.println("Parallel: " + parallelTime + "ms");
System.out.println("Sequential: " + sequentialTime + "ms");
// Для больших данных parallelStream может быть в 2-4 раза быстрее

5. Когда параллельный stream НЕ эффективен

// ПЛОХО: маленькое количество элементов
List<Integer> smallList = Arrays.asList(1, 2, 3, 4, 5);

long result = smallList.parallelStream()
    .map(x -> x * 2)
    .sum();
// Overhead создания потоков может быть больше чем экономия времени

// ПЛОХО: дорогая операция синхронизации
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<String> result = new ArrayList<>();

list.parallelStream()
    .forEach(x -> {
        result.add(String.valueOf(x)); // UNSAFE! Race condition
    });
// Потоки будут бороться за доступ к ArrayList

6. Потокобезопасность и сбор результатов

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

// БЕЗОПАСНО: используй Collector
List<String> safe = numbers.parallelStream()
    .map(x -> "Item: " + x)
    .collect(Collectors.toList());
// Collector обрабатывает синхронизацию

// ОПАСНО: мутирование общего объекта
List<String> unsafe = new ArrayList<>();
numbers.parallelStream()
    .forEach(x -> unsafe.add("Item: " + x));
// Race condition! Разные потоки пишут одновременно

// БЕЗОПАСНО: reduce для объединения результатов
String result = numbers.parallelStream()
    .map(Object::toString)
    .reduce("", (a, b) -> a + "," + b);
// reduce безопасен для параллельных потоков

7. Пример: обработка больших данных

public class LargeDataProcessing {
    public static void main(String[] args) {
        // Список из миллиона объектов
        List<Person> people = generateMillionPeople();
        
        // Последовательная обработка
        long start = System.currentTimeMillis();
        long adultCount = people.stream()
            .filter(p -> p.getAge() >= 18)
            .map(Person::getName)
            .peek(name -> processName(name)) // дорогая операция
            .count();
        System.out.println("Sequential: " + (System.currentTimeMillis() - start) + "ms");
        
        // Параллельная обработка
        start = System.currentTimeMillis();
        adultCount = people.parallelStream()
            .filter(p -> p.getAge() >= 18)
            .map(Person::getName)
            .peek(name -> processName(name))
            .count();
        System.out.println("Parallel: " + (System.currentTimeMillis() - start) + "ms");
    }
    
    private static void processName(String name) {
        try { Thread.sleep(1); } catch (InterruptedException e) {}
    }
}

8. Управление потокообразом ForkJoinPool

// По умолчанию используется ForkJoinPool.commonPool()
// Его размер = количество процессоров - 1

// Можно указать своё количество потоков через system property
// java -Djava.util.concurrent.ForkJoinPool.common.parallelism=4 MyApp

// Или создать свой ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(8); // 8 потоков

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

long result = customPool.invoke(
    numbers.parallelStream()
        .mapToLong(x -> x * x)
        .sum()
);

9. Статные vs безопасные операции

// НЕБЕЗОПАСНО: stateful operation
List<Integer> list = new ArrayList<>();
numbers.parallelStream()
    .map(x -> {
        list.add(x); // Разные потоки пишут одновременно
        return x * 2;
    })
    .collect(Collectors.toList());

// БЕЗОПАСНО: stateless operation
List<Integer> result = numbers.parallelStream()
    .map(x -> x * 2)  // Чистая функция, нет побочных эффектов
    .collect(Collectors.toList());

10. Когда использовать parallelStream

ИСПОЛЬЗОВАТЬ:

  • Обработка больших списков (> 10000 элементов)
  • Каждая операция занимает значительное время
  • Данные хранятся в хорошо распределяемых структурах (ArrayList, HashSet)
  • Нет зависимостей между элементами

НЕ ИСПОЛЬЗОВАТЬ:

  • Маленькие списки (< 1000 элементов)
  • Быстрые операции (простая фильтрация, маппинг)
  • Данные в LinkedList (плохая локальность кеша)
  • Дорогие операции ввода-вывода (I/O)
  • Необходимо упорядочивание результатов

Вывод

паралlelStream() обрабатывает данные параллельно на нескольких потоках через ForkJoinPool, что может значительно ускорить обработку больших объёмов данных. Однако это эффективно только при определённых условиях — большой объём данных, дорогие операции, отсутствие синхронизации. Для маленьких списков параллелизм добавляет только overhead.