← Назад к вопросам

Миллион наименьших чисел

2.2 Middle🔥 191 комментариев
#Тестирование

Условие

Опишите алгоритм для нахождения миллиона наименьших чисел в наборе из миллиарда чисел.

Обсудите:

  1. Наивный подход с сортировкой — какая сложность?
  2. Подход с min-heap — как работает?
  3. Можно ли решить за линейное время?

Ограничения

  • 1 миллиард чисел не помещается в память целиком
  • Нужен оптимальный по памяти и времени алгоритм

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Миллион наименьших чисел

Проблема

Найти M наименьших элементов из N элементов (M = 10^6, N = 10^9), когда данные не помещаются в памяти целиком. Это классическая задача поиска k-smallest элементов.

1. Наивный подход с сортировкой — какая сложность?

Идея: Отсортировать всё, взять первые K элементов.

# ❌ Наивный подход
def naive_k_smallest(numbers, k):
    return sorted(numbers)[:k]

Анализ сложности:

  • Временная: O(N log N) = 10^9 × 30 = ~3×10^10 операций (медленно)
  • Пространственная: O(N) = 4GB RAM (не помещается!)
  • Вывод: Для миллиарда чисел это неприемлемо

2. Подход с max-heap (приоритетная очередь)

Ключевая идея: Хранить только M элементов в памяти, используя max-heap.

Как работает:

  1. Создаём max-heap размер M
  2. Читаем элементы потоком из файла/БД
  3. Если элемент < максимум в heap → удаляем max, добавляем новый
  4. После обработки всех N элементов → у нас M наименьших
import heapq
from typing import List, Iterator

def k_smallest_numbers(data_stream: Iterator[int], k: int) -> List[int]:
    """
    Найти k наименьших чисел из потока данных.
    
    Временная сложность: O(N log k)
    Пространственная: O(k)
    """
    # max-heap реализуем как инвертированный min-heap
    # Python имеет только min-heap, поэтому используем отрицательные значения
    max_heap = []
    
    for num in data_stream:
        # Если heap не полон или число меньше максимума
        if len(max_heap) < k:
            heapq.heappush(max_heap, -num)
        elif num < -max_heap[0]:  # num < max элемент
            heapq.heapreplace(max_heap, -num)
    
    # Результат (отрицательные значения, сортируем в обратном порядке)
    return sorted([-x for x in max_heap])

# Использование
result = k_smallest_numbers(range(1000000000), 1000000)
print(result[:10])  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Анализ сложности:

  • Временная: O(N log k) = 10^9 × log(10^6) ≈ 10^9 × 20 = 2×10^10 (лучше!)
  • Пространственная: O(k) = 4MB (храним только 1М чисел)
  • Почему это работает: на каждой операции heap имеет ~20 операций, не N log N

Сравнение с сортировкой:

  • Сортировка: 30B операций + 4GB памяти
  • Heap: 2B операций + 4MB памяти ✓

3. Можно ли решить за линейное время?

Да! Используя QuickSelect алгоритм (выбор k-го элемента за O(N) в среднем).

Идея: Это как QuickSort, но мы рекурсивно ищем только левую или правую часть.

import random
from typing import List

def quickselect(arr: List[int], k: int) -> int:
    """
    Найти k-й наименьший элемент за O(N) в среднем.
    k = 0 → минимум, k = len(arr)-1 → максимум
    """
    if len(arr) == 1:
        return arr[0]
    
    # Выбираем случайный pivot
    pivot_index = random.randint(0, len(arr) - 1)
    pivot = arr[pivot_index]
    
    # Разбиваем на две части
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    
    # Определяем, в какой части находится k-й элемент
    if k < len(left):
        return quickselect(left, k)
    elif k < len(left) + len(middle):
        return middle[0]
    else:
        return quickselect(right, k - len(left) - len(middle))

# Найти медиану (n//2-й элемент)
median = quickselect([5, 2, 8, 1, 9], 2)  # O(N) в среднем

Для M наименьших чисел:

def k_smallest_quickselect(data_stream, k: int) -> List[int]:
    """
    Использовать QuickSelect для нахождения k-го элемента,
    затем вернуть все элементы <= k-го.
    """
    # Шаг 1: Читаем весь поток в массив (или используем external sorting)
    data = list(data_stream)  # Если не помещается → external sorting!
    
    # Шаг 2: Найти k-й элемент за O(N)
    kth_element = quickselect(data, k - 1)
    
    # Шаг 3: Вернуть все элементы <= kth_element
    result = [x for x in data if x <= kth_element]
    result.sort()
    
    return result[:k]

# Сложность: O(N) в среднем

Но есть подводный камень: Если данные не помещаются в памяти, нужно использовать External Sorting (merge sort на диске).

4. External Sorting для 1 миллиарда чисел

def external_sort_find_k_smallest(filename: str, k: int, chunk_size: int = 100_000_000):
    """
    Когда 1B чисел не помещается в памяти:
    1. Разбиваем на чанки
    2. Сортируем каждый чанк
    3. Мержим в порядке, беря только K элементов
    """
    import heapq
    
    # Шаг 1: Разбиваем на отсортированные чанки
    chunk_files = []
    with open(filename, 'r') as f:
        chunk = []
        for line in f:
            chunk.append(int(line.strip()))
            if len(chunk) == chunk_size:
                chunk.sort()
                chunk_file = f"chunk_{len(chunk_files)}.txt"
                with open(chunk_file, 'w') as cf:
                    cf.write('\n'.join(map(str, chunk)))
                chunk_files.append(chunk_file)
                chunk = []
        
        # Последний чанк
        if chunk:
            chunk.sort()
            chunk_file = f"chunk_{len(chunk_files)}.txt"
            with open(chunk_file, 'w') as cf:
                cf.write('\n'.join(map(str, chunk)))
            chunk_files.append(chunk_file)
    
    # Шаг 2: Мержим K-way merge
    result = []
    file_handles = [open(f, 'r') for f in chunk_files]
    min_heap = []
    
    # Инициализируем heap первыми элементами из каждого файла
    for i, f in enumerate(file_handles):
        line = f.readline()
        if line:
            heapq.heappush(min_heap, (int(line.strip()), i))
    
    # Читаем K элементов
    while result and len(result) < k:
        val, file_idx = heapq.heappop(min_heap)
        result.append(val)
        
        line = file_handles[file_idx].readline()
        if line:
            heapq.heappush(min_heap, (int(line.strip()), file_idx))
    
    for f in file_handles:
        f.close()
    
    return result

Сравнение всех подходов

ПодходВремяПамятьКогда использовать
СортировкаO(N log N)O(N)Когда K близко к N
Max-HeapO(N log K)O(K)Стандартный выбор
QuickSelectO(N) avgO(N)Если данные в памяти
External SortO(N log N)O(1)Данные на диске

Рекомендация

Для 1 млрд чисел найти 1М наименьших:

  1. Если данные в потоке (не помещаются в памяти)max-heap подход (O(N log K), O(K) памяти)
  2. Если все данные можно загрузитьQuickSelect (O(N) в среднем)
  3. Если данные на дискеExternal merge sort