Миллион наименьших чисел
Условие
Опишите алгоритм для нахождения миллиона наименьших чисел в наборе из миллиарда чисел.
Обсудите:
- Наивный подход с сортировкой — какая сложность?
- Подход с min-heap — как работает?
- Можно ли решить за линейное время?
Ограничения
- 1 миллиард чисел не помещается в память целиком
- Нужен оптимальный по памяти и времени алгоритм
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Миллион наименьших чисел
Проблема
Найти M наименьших элементов из N элементов (M = 10^6, N = 10^9), когда данные не помещаются в памяти целиком. Это классическая задача поиска k-smallest элементов.
1. Наивный подход с сортировкой — какая сложность?
Идея: Отсортировать всё, взять первые K элементов.
# ❌ Наивный подход
def naive_k_smallest(numbers, k):
return sorted(numbers)[:k]
Анализ сложности:
- Временная: O(N log N) = 10^9 × 30 = ~3×10^10 операций (медленно)
- Пространственная: O(N) = 4GB RAM (не помещается!)
- Вывод: Для миллиарда чисел это неприемлемо
2. Подход с max-heap (приоритетная очередь)
Ключевая идея: Хранить только M элементов в памяти, используя max-heap.
Как работает:
- Создаём max-heap размер M
- Читаем элементы потоком из файла/БД
- Если элемент < максимум в heap → удаляем max, добавляем новый
- После обработки всех N элементов → у нас M наименьших
import heapq
from typing import List, Iterator
def k_smallest_numbers(data_stream: Iterator[int], k: int) -> List[int]:
"""
Найти k наименьших чисел из потока данных.
Временная сложность: O(N log k)
Пространственная: O(k)
"""
# max-heap реализуем как инвертированный min-heap
# Python имеет только min-heap, поэтому используем отрицательные значения
max_heap = []
for num in data_stream:
# Если heap не полон или число меньше максимума
if len(max_heap) < k:
heapq.heappush(max_heap, -num)
elif num < -max_heap[0]: # num < max элемент
heapq.heapreplace(max_heap, -num)
# Результат (отрицательные значения, сортируем в обратном порядке)
return sorted([-x for x in max_heap])
# Использование
result = k_smallest_numbers(range(1000000000), 1000000)
print(result[:10]) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Анализ сложности:
- Временная: O(N log k) = 10^9 × log(10^6) ≈ 10^9 × 20 = 2×10^10 (лучше!)
- Пространственная: O(k) = 4MB (храним только 1М чисел)
- Почему это работает: на каждой операции heap имеет ~20 операций, не N log N
Сравнение с сортировкой:
- Сортировка: 30B операций + 4GB памяти
- Heap: 2B операций + 4MB памяти ✓
3. Можно ли решить за линейное время?
Да! Используя QuickSelect алгоритм (выбор k-го элемента за O(N) в среднем).
Идея: Это как QuickSort, но мы рекурсивно ищем только левую или правую часть.
import random
from typing import List
def quickselect(arr: List[int], k: int) -> int:
"""
Найти k-й наименьший элемент за O(N) в среднем.
k = 0 → минимум, k = len(arr)-1 → максимум
"""
if len(arr) == 1:
return arr[0]
# Выбираем случайный pivot
pivot_index = random.randint(0, len(arr) - 1)
pivot = arr[pivot_index]
# Разбиваем на две части
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
# Определяем, в какой части находится k-й элемент
if k < len(left):
return quickselect(left, k)
elif k < len(left) + len(middle):
return middle[0]
else:
return quickselect(right, k - len(left) - len(middle))
# Найти медиану (n//2-й элемент)
median = quickselect([5, 2, 8, 1, 9], 2) # O(N) в среднем
Для M наименьших чисел:
def k_smallest_quickselect(data_stream, k: int) -> List[int]:
"""
Использовать QuickSelect для нахождения k-го элемента,
затем вернуть все элементы <= k-го.
"""
# Шаг 1: Читаем весь поток в массив (или используем external sorting)
data = list(data_stream) # Если не помещается → external sorting!
# Шаг 2: Найти k-й элемент за O(N)
kth_element = quickselect(data, k - 1)
# Шаг 3: Вернуть все элементы <= kth_element
result = [x for x in data if x <= kth_element]
result.sort()
return result[:k]
# Сложность: O(N) в среднем
Но есть подводный камень: Если данные не помещаются в памяти, нужно использовать External Sorting (merge sort на диске).
4. External Sorting для 1 миллиарда чисел
def external_sort_find_k_smallest(filename: str, k: int, chunk_size: int = 100_000_000):
"""
Когда 1B чисел не помещается в памяти:
1. Разбиваем на чанки
2. Сортируем каждый чанк
3. Мержим в порядке, беря только K элементов
"""
import heapq
# Шаг 1: Разбиваем на отсортированные чанки
chunk_files = []
with open(filename, 'r') as f:
chunk = []
for line in f:
chunk.append(int(line.strip()))
if len(chunk) == chunk_size:
chunk.sort()
chunk_file = f"chunk_{len(chunk_files)}.txt"
with open(chunk_file, 'w') as cf:
cf.write('\n'.join(map(str, chunk)))
chunk_files.append(chunk_file)
chunk = []
# Последний чанк
if chunk:
chunk.sort()
chunk_file = f"chunk_{len(chunk_files)}.txt"
with open(chunk_file, 'w') as cf:
cf.write('\n'.join(map(str, chunk)))
chunk_files.append(chunk_file)
# Шаг 2: Мержим K-way merge
result = []
file_handles = [open(f, 'r') for f in chunk_files]
min_heap = []
# Инициализируем heap первыми элементами из каждого файла
for i, f in enumerate(file_handles):
line = f.readline()
if line:
heapq.heappush(min_heap, (int(line.strip()), i))
# Читаем K элементов
while result and len(result) < k:
val, file_idx = heapq.heappop(min_heap)
result.append(val)
line = file_handles[file_idx].readline()
if line:
heapq.heappush(min_heap, (int(line.strip()), file_idx))
for f in file_handles:
f.close()
return result
Сравнение всех подходов
| Подход | Время | Память | Когда использовать |
|---|---|---|---|
| Сортировка | O(N log N) | O(N) | Когда K близко к N |
| Max-Heap | O(N log K) | O(K) | Стандартный выбор |
| QuickSelect | O(N) avg | O(N) | Если данные в памяти |
| External Sort | O(N log N) | O(1) | Данные на диске |
Рекомендация
Для 1 млрд чисел найти 1М наименьших:
- Если данные в потоке (не помещаются в памяти) → max-heap подход (O(N log K), O(K) памяти)
- Если все данные можно загрузить → QuickSelect (O(N) в среднем)
- Если данные на диске → External merge sort