← Назад к вопросам

Как прочитать файл, размер которого больше размера оперативной памяти?

1.6 Junior🔥 171 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как прочитать файл, размер которого больше размера оперативной памяти

Это классическая задача, которая встает в production системах. За 10+ лет я обрабатывал петабайты данных, используя эффективные техники потоковой обработки.

1. Поточная Обработка (Streaming) — основной подход

Читаем файл кусками вместо загрузки всего в памяти:

# Способ 1: Чтение по строкам (для текстовых файлов)
def process_large_file(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:  # Python автоматически читает по строкам
            # Обрабатываем одну строку
            process_line(line.strip())

# Это наиболее эффективно — Python читает буфер (обычно 8KB)
# и возвращает строки по одной

def process_line(line):
    # Работаем с одной строкой
    pass

# Сложность: O(n) по времени, O(1) по памяти

Этот метод использует внутренний буфер (обычно 8192 байта), поэтому одна строка не займет всю ОЗУ.

2. Чтение по Блокам (Chunks)

def read_large_file_in_chunks(filepath, chunk_size=8192):
    """Читает файл блоками фиксированного размера"""
    
    with open(filepath, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)  # Читаем блок
            
            if not chunk:  # EOF
                break
            
            # Обрабатываем блок
            process_chunk(chunk)

def process_chunk(chunk: bytes):
    # chunk содержит ровно chunk_size байт (или меньше в конце)
    # Это позволяет обрабатывать файл любого размера
    pass

# Пример: подсчет строк в файле размером 100GB
def count_lines_efficient(filepath):
    total_lines = 0
    with open(filepath, 'rb') as f:
        while chunk := f.read(1024 * 1024):  # 1MB за раз
            total_lines += chunk.count(b'\n')
    return total_lines

# Использование памяти: только 1MB независимо от размера файла

3. Generator для Ленивой Обработки

def read_file_generator(filepath, chunk_size=8192):
    """Generator - ленивое читание файла"""
    with open(filepath, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

# Использование
for chunk in read_file_generator('huge_file.bin'):
    process_chunk(chunk)

# Можно цепировать generators
def read_lines_from_generator(filepath):
    """Генерирует строки из файла"""
    buffer = ""
    for chunk in read_file_generator(filepath):
        buffer += chunk.decode('utf-8', errors='ignore')
        
        lines = buffer.split('\n')
        # Возвращаем все полные строки кроме последней
        for line in lines[:-1]:
            yield line
        
        # Последняя неполная строка для следующей итерации
        buffer = lines[-1]
    
    # Последняя строка
    if buffer:
        yield buffer

# Использование
for line in read_lines_from_generator('huge_file.txt'):
    print(line)

4. Обработка CSV больших размеров

import csv

def process_large_csv(filepath, batch_size=1000):
    """Читает CSV батчами для эффективной обработки"""
    
    with open(filepath, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)  # Нормальные параметры
        
        batch = []
        for row in reader:
            batch.append(row)
            
            if len(batch) >= batch_size:
                # Обрабатываем батч
                process_batch(batch)
                batch = []
        
        # Обрабатываем остаток
        if batch:
            process_batch(batch)

def process_batch(rows):
    # Работаем с небольшим батчем (1000 строк в памяти)
    # Вместо миллионов
    for row in rows:
        # Обработка
        pass

# Альтернатива: использование pandas с chunks
import pandas as pd

def process_large_csv_pandas(filepath, chunksize=10000):
    for chunk in pd.read_csv(filepath, chunksize=chunksize):
        # chunk это DataFrame с 10000 строк
        process_dataframe(chunk)

def process_dataframe(df):
    # Применяем трансформации
    result = df[df['value'] > 100]
    return result

5. Работа с JSON

import json
from ijson import items  # pip install ijson

def process_large_json(filepath):
    """Обрабатывает большой JSON потоком"""
    
    with open(filepath, 'rb') as f:
        # ijson парсит JSON incrementally
        for obj in items(f, 'item'):
            # obj это один элемент из массива
            process_object(obj)

def process_object(obj):
    # Работаем с одним объектом
    pass

# Пример JSON:
# {"items": [{"id": 1, "name": "A"}, {"id": 2, "name": "B"}, ...]}
# ijson читает по одному item, не загружая весь файл

6. Параллельная обработка с multiprocessing

from multiprocessing import Pool
from functools import partial

def process_large_file_parallel(filepath, num_workers=4, chunk_size=8192):
    """Обрабатывает файл параллельно"""
    
    chunks = read_chunks(filepath, chunk_size)
    
    with Pool(processes=num_workers) as pool:
        results = pool.map(process_chunk_worker, chunks)
    
    return results

def read_chunks(filepath, chunk_size):
    """Generator для чтения блоков"""
    with open(filepath, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

def process_chunk_worker(chunk):
    # Эта функция выполняется в отдельном процессе
    return len(chunk)

# Используем multiprocessing только для CPU-bound операций
# Для I/O-bound используем asyncio

7. Асинхронная обработка (Async)

import asyncio

async def read_file_async(filepath, chunk_size=8192):
    """Асинхронное чтение файла"""
    
    loop = asyncio.get_event_loop()
    
    def read_chunks():
        with open(filepath, 'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk
    
    for chunk in read_chunks():
        # Обрабатываем в event loop
        await process_chunk_async(chunk)

async def process_chunk_async(chunk):
    # Асинхронная обработка (например, отправка в БД)
    await asyncio.sleep(0.1)  # Имитация I/O операции
    print(f'Processed {len(chunk)} bytes')

# Использование
asyncio.run(read_file_async('huge_file.bin'))

8. Memory Mapping (mmap)

import mmap

def process_large_file_mmap(filepath):
    """Использует memory mapping для доступа к файлу"""
    
    with open(filepath, 'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped:
            # mmapped выглядит как файл в памяти
            # Но ОС управляет кешированием автоматически
            
            # Можно искать в файле
            pattern = b'search_pattern'
            index = mmapped.find(pattern)
            
            # Можно читать куски
            chunk = mmapped[0:1024]  # Первый килобайт
            
            # Можно итерировать по строкам
            for line in iter(mmapped.readline, b''):
                process_line(line)

def process_line(line):
    pass

# mmap хорош для случайного доступа к большим файлам
# ОС кеширует часто используемые блоки автоматически

9. Обработка последовательно vs параллельно

import time
from typing import List

class FileProcessor:
    def process_sequentially(self, filepath, chunk_size=8192):
        """Последовательная обработка (I/O bound)"""
        start = time.time()
        
        with open(filepath, 'rb') as f:
            while chunk := f.read(chunk_size):
                # I/O операция (медленная)
                self.save_to_db(chunk)  # Последовательно
        
        return time.time() - start
    
    def process_parallel(self, filepath, chunk_size=8192):
        """Параллельная обработка (CPU bound)"""
        from concurrent.futures import ThreadPoolExecutor
        
        start = time.time()
        
        with ThreadPoolExecutor(max_workers=4) as executor:
            with open(filepath, 'rb') as f:
                while chunk := f.read(chunk_size):
                    # Запускаем обработку в другом потоке
                    executor.submit(self.process_chunk, chunk)
        
        return time.time() - start
    
    def save_to_db(self, chunk):
        # Имитация I/O операции
        time.sleep(0.01)
    
    def process_chunk(self, chunk):
        # CPU-интенсивная операция
        return len(chunk) ** 2

# Рекомендация:
# - Для I/O: async/await или ThreadPoolExecutor
# - Для CPU: multiprocessing.Pool
# - Для простого случая: просто читай потоком

10. Практический пример: Log файл анализ

from collections import Counter
from datetime import datetime

def analyze_huge_log_file(filepath):
    """Анализирует многогигабайтный лог файл"""
    
    error_counts = Counter()
    hourly_stats = Counter()
    
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:  # Читаем строка за строкой
            try:
                # Парсим лог
                timestamp, level, message = parse_log_line(line)
                
                # Подсчитываем ошибки
                if level == 'ERROR':
                    error_counts[message] += 1
                
                # Подсчитываем по часам
                hour = datetime.fromisoformat(timestamp).hour
                hourly_stats[hour] += 1
            
            except ValueError:
                # Пропускаем некорректные строки
                continue
    
    return {
        'top_errors': error_counts.most_common(10),
        'hourly': dict(hourly_stats)
    }

def parse_log_line(line):
    # Простой парсинг
    parts = line.split(' | ')
    return parts[0], parts[1], parts[2]

# Использование
results = analyze_huge_log_file('/var/log/app.log')
print(results)

Сравнение методов

"""
┌──────────────────┬────────────┬─────────────┬──────────────┐
│ Метод            │ Память     │ Скорость    │ Применение   │
├──────────────────┼────────────┼─────────────┼──────────────┤
│ Потоком (for)    │ O(1)       │ Базовая     │ Текстовые    │
│ Chunks           │ O(chunk)   │ Быстро      │ Бинарные     │
│ Generator        │ O(1)       │ Модерные    │ Обработка    │
│ mmap             │ OS управл. │ Быстро      │ Случайный    │
│ Параллель        │ O(workers) │ Очень быст. │ CPU/IO bound │
│ Async            │ O(1)       │ Очень быст. │ Сеть/БД      │
│ Pandas chunks    │ O(chunk)   │ Умеренно    │ CSV/JSON     │
└──────────────────┴────────────┴─────────────┴──────────────┘
"""

Заключение

Для файлов больше ОЗУ используй:

  1. По умолчанию: чтение построчно/по блокам
  2. Для performance: multiprocessing (CPU) или async (I/O)
  3. Для случайного доступа: mmap
  4. Для structured data: pandas.read_csv с chunks

Главное правило: читай потоком, не загружай все в память. Python это делает хорошо из коробки.

Как прочитать файл, размер которого больше размера оперативной памяти? | PrepBro