← Назад к вопросам
Как прочитать файл, размер которого больше размера оперативной памяти?
1.6 Junior🔥 171 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как прочитать файл, размер которого больше размера оперативной памяти
Это классическая задача, которая встает в production системах. За 10+ лет я обрабатывал петабайты данных, используя эффективные техники потоковой обработки.
1. Поточная Обработка (Streaming) — основной подход
Читаем файл кусками вместо загрузки всего в памяти:
# Способ 1: Чтение по строкам (для текстовых файлов)
def process_large_file(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
for line in f: # Python автоматически читает по строкам
# Обрабатываем одну строку
process_line(line.strip())
# Это наиболее эффективно — Python читает буфер (обычно 8KB)
# и возвращает строки по одной
def process_line(line):
# Работаем с одной строкой
pass
# Сложность: O(n) по времени, O(1) по памяти
Этот метод использует внутренний буфер (обычно 8192 байта), поэтому одна строка не займет всю ОЗУ.
2. Чтение по Блокам (Chunks)
def read_large_file_in_chunks(filepath, chunk_size=8192):
"""Читает файл блоками фиксированного размера"""
with open(filepath, 'rb') as f:
while True:
chunk = f.read(chunk_size) # Читаем блок
if not chunk: # EOF
break
# Обрабатываем блок
process_chunk(chunk)
def process_chunk(chunk: bytes):
# chunk содержит ровно chunk_size байт (или меньше в конце)
# Это позволяет обрабатывать файл любого размера
pass
# Пример: подсчет строк в файле размером 100GB
def count_lines_efficient(filepath):
total_lines = 0
with open(filepath, 'rb') as f:
while chunk := f.read(1024 * 1024): # 1MB за раз
total_lines += chunk.count(b'\n')
return total_lines
# Использование памяти: только 1MB независимо от размера файла
3. Generator для Ленивой Обработки
def read_file_generator(filepath, chunk_size=8192):
"""Generator - ленивое читание файла"""
with open(filepath, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# Использование
for chunk in read_file_generator('huge_file.bin'):
process_chunk(chunk)
# Можно цепировать generators
def read_lines_from_generator(filepath):
"""Генерирует строки из файла"""
buffer = ""
for chunk in read_file_generator(filepath):
buffer += chunk.decode('utf-8', errors='ignore')
lines = buffer.split('\n')
# Возвращаем все полные строки кроме последней
for line in lines[:-1]:
yield line
# Последняя неполная строка для следующей итерации
buffer = lines[-1]
# Последняя строка
if buffer:
yield buffer
# Использование
for line in read_lines_from_generator('huge_file.txt'):
print(line)
4. Обработка CSV больших размеров
import csv
def process_large_csv(filepath, batch_size=1000):
"""Читает CSV батчами для эффективной обработки"""
with open(filepath, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f) # Нормальные параметры
batch = []
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
# Обрабатываем батч
process_batch(batch)
batch = []
# Обрабатываем остаток
if batch:
process_batch(batch)
def process_batch(rows):
# Работаем с небольшим батчем (1000 строк в памяти)
# Вместо миллионов
for row in rows:
# Обработка
pass
# Альтернатива: использование pandas с chunks
import pandas as pd
def process_large_csv_pandas(filepath, chunksize=10000):
for chunk in pd.read_csv(filepath, chunksize=chunksize):
# chunk это DataFrame с 10000 строк
process_dataframe(chunk)
def process_dataframe(df):
# Применяем трансформации
result = df[df['value'] > 100]
return result
5. Работа с JSON
import json
from ijson import items # pip install ijson
def process_large_json(filepath):
"""Обрабатывает большой JSON потоком"""
with open(filepath, 'rb') as f:
# ijson парсит JSON incrementally
for obj in items(f, 'item'):
# obj это один элемент из массива
process_object(obj)
def process_object(obj):
# Работаем с одним объектом
pass
# Пример JSON:
# {"items": [{"id": 1, "name": "A"}, {"id": 2, "name": "B"}, ...]}
# ijson читает по одному item, не загружая весь файл
6. Параллельная обработка с multiprocessing
from multiprocessing import Pool
from functools import partial
def process_large_file_parallel(filepath, num_workers=4, chunk_size=8192):
"""Обрабатывает файл параллельно"""
chunks = read_chunks(filepath, chunk_size)
with Pool(processes=num_workers) as pool:
results = pool.map(process_chunk_worker, chunks)
return results
def read_chunks(filepath, chunk_size):
"""Generator для чтения блоков"""
with open(filepath, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def process_chunk_worker(chunk):
# Эта функция выполняется в отдельном процессе
return len(chunk)
# Используем multiprocessing только для CPU-bound операций
# Для I/O-bound используем asyncio
7. Асинхронная обработка (Async)
import asyncio
async def read_file_async(filepath, chunk_size=8192):
"""Асинхронное чтение файла"""
loop = asyncio.get_event_loop()
def read_chunks():
with open(filepath, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
for chunk in read_chunks():
# Обрабатываем в event loop
await process_chunk_async(chunk)
async def process_chunk_async(chunk):
# Асинхронная обработка (например, отправка в БД)
await asyncio.sleep(0.1) # Имитация I/O операции
print(f'Processed {len(chunk)} bytes')
# Использование
asyncio.run(read_file_async('huge_file.bin'))
8. Memory Mapping (mmap)
import mmap
def process_large_file_mmap(filepath):
"""Использует memory mapping для доступа к файлу"""
with open(filepath, 'rb') as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped:
# mmapped выглядит как файл в памяти
# Но ОС управляет кешированием автоматически
# Можно искать в файле
pattern = b'search_pattern'
index = mmapped.find(pattern)
# Можно читать куски
chunk = mmapped[0:1024] # Первый килобайт
# Можно итерировать по строкам
for line in iter(mmapped.readline, b''):
process_line(line)
def process_line(line):
pass
# mmap хорош для случайного доступа к большим файлам
# ОС кеширует часто используемые блоки автоматически
9. Обработка последовательно vs параллельно
import time
from typing import List
class FileProcessor:
def process_sequentially(self, filepath, chunk_size=8192):
"""Последовательная обработка (I/O bound)"""
start = time.time()
with open(filepath, 'rb') as f:
while chunk := f.read(chunk_size):
# I/O операция (медленная)
self.save_to_db(chunk) # Последовательно
return time.time() - start
def process_parallel(self, filepath, chunk_size=8192):
"""Параллельная обработка (CPU bound)"""
from concurrent.futures import ThreadPoolExecutor
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
with open(filepath, 'rb') as f:
while chunk := f.read(chunk_size):
# Запускаем обработку в другом потоке
executor.submit(self.process_chunk, chunk)
return time.time() - start
def save_to_db(self, chunk):
# Имитация I/O операции
time.sleep(0.01)
def process_chunk(self, chunk):
# CPU-интенсивная операция
return len(chunk) ** 2
# Рекомендация:
# - Для I/O: async/await или ThreadPoolExecutor
# - Для CPU: multiprocessing.Pool
# - Для простого случая: просто читай потоком
10. Практический пример: Log файл анализ
from collections import Counter
from datetime import datetime
def analyze_huge_log_file(filepath):
"""Анализирует многогигабайтный лог файл"""
error_counts = Counter()
hourly_stats = Counter()
with open(filepath, 'r', encoding='utf-8') as f:
for line in f: # Читаем строка за строкой
try:
# Парсим лог
timestamp, level, message = parse_log_line(line)
# Подсчитываем ошибки
if level == 'ERROR':
error_counts[message] += 1
# Подсчитываем по часам
hour = datetime.fromisoformat(timestamp).hour
hourly_stats[hour] += 1
except ValueError:
# Пропускаем некорректные строки
continue
return {
'top_errors': error_counts.most_common(10),
'hourly': dict(hourly_stats)
}
def parse_log_line(line):
# Простой парсинг
parts = line.split(' | ')
return parts[0], parts[1], parts[2]
# Использование
results = analyze_huge_log_file('/var/log/app.log')
print(results)
Сравнение методов
"""
┌──────────────────┬────────────┬─────────────┬──────────────┐
│ Метод │ Память │ Скорость │ Применение │
├──────────────────┼────────────┼─────────────┼──────────────┤
│ Потоком (for) │ O(1) │ Базовая │ Текстовые │
│ Chunks │ O(chunk) │ Быстро │ Бинарные │
│ Generator │ O(1) │ Модерные │ Обработка │
│ mmap │ OS управл. │ Быстро │ Случайный │
│ Параллель │ O(workers) │ Очень быст. │ CPU/IO bound │
│ Async │ O(1) │ Очень быст. │ Сеть/БД │
│ Pandas chunks │ O(chunk) │ Умеренно │ CSV/JSON │
└──────────────────┴────────────┴─────────────┴──────────────┘
"""
Заключение
Для файлов больше ОЗУ используй:
- По умолчанию: чтение построчно/по блокам
- Для performance: multiprocessing (CPU) или async (I/O)
- Для случайного доступа: mmap
- Для structured data: pandas.read_csv с chunks
Главное правило: читай потоком, не загружай все в память. Python это делает хорошо из коробки.