← Назад к вопросам

Какие знаешь варианты работы с большим файлом в Python?

1.7 Middle🔥 121 комментариев
#Python Core#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Работа с большими файлами в Python

Обработка больших файлов требует особых подходов, чтобы не перегрузить оперативную память. Рассмотрим все основные техники и когда их использовать.

1. Чтение по частям (Chunking)

Самый базовый и универсальный метод:

# Чтение файла небольшими блоками
with open('large_file.txt', 'r') as f:
    chunk_size = 8192  # 8 KB за раз
    while True:
        chunk = f.read(chunk_size)
        if not chunk:
            break
        process_chunk(chunk)  # Обработка части

# Пример: подсчет строк в файле 1 GB
def count_lines(filepath):
    count = 0
    with open(filepath, 'r') as f:
        for chunk in iter(lambda: f.read(8192), ''):
            count += chunk.count('\n')
    return count

lines = count_lines('big_file.txt')  # Не загружает весь файл в память

Плюсы: Простота, контроль над размером блока Минусы: Нужно вручную управлять границами (особенно для CSV)

2. Чтение по строкам (Line Iterator)

Для текстовых файлов:

# Итерирование по строкам
with open('large_file.txt', 'r') as f:
    for line in f:
        process_line(line.strip())

# Примеры
def process_large_log_file(filepath):
    errors = 0
    warnings = 0
    with open(filepath, 'r') as f:
        for line in f:
            if "ERROR" in line:
                errors += 1
            elif "WARNING" in line:
                warnings += 1
    return errors, warnings

errors, warnings = process_large_log_file('app.log')
print(f"Ошибок: {errors}, Предупреждений: {warnings}")

# С обработкой вывода
with open('input.txt', 'r') as infile, \
     open('output.txt', 'w') as outfile:
    for line in infile:
        processed = line.upper()  # Обработка
        outfile.write(processed)

Плюсы: Автоматическое разбиение на строки, буферизация Python Минусы: Может быть медленнее, чем explicit chunking

3. Generators — лучший паттерн

Для максимальной эффективности памяти:

# Генератор для обработки CSV
def read_csv_chunks(filepath, chunk_size=1000):
    import csv
    with open(filepath, 'r') as f:
        reader = csv.DictReader(f)
        chunk = []
        for row in reader:
            chunk.append(row)
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
        if chunk:
            yield chunk

# Использование
for rows in read_csv_chunks('users.csv', chunk_size=5000):
    # Обрабатываем по 5000 строк за раз
    process_batch(rows)
    print(f"Обработано {len(rows)} строк")

# Генератор для чтения JSON строк
def read_jsonl(filepath):
    import json
    with open(filepath, 'r') as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

# Использование
for record in read_jsonl('data.jsonl'):
    process_record(record)

Плюсы: Ленивое вычисление, минимум памяти, читаемо Минусы: Нужно понимать механизм generators

4. Pandas для больших данных

Идеален для структурированных данных:

import pandas as pd

# Чтение по частям
for chunk in pd.read_csv('large_file.csv', chunksize=10000):
    # Обработка части DataFrame
    print(f"Обработано {len(chunk)} строк")
    process_dataframe(chunk)

# Примеры
def analyze_large_csv(filepath):
    # Чтение всего файла ПОТЕНЦИАЛЬНО опасно
    # df = pd.read_csv(filepath)  # ❌ Может не хватить памяти
    
    # Правильно — по частям
    total_sum = 0
    for chunk in pd.read_csv(filepath, chunksize=50000):
        total_sum += chunk['amount'].sum()
    return total_sum

total = analyze_large_csv('transactions.csv')
print(f"Общая сумма: {total}")

# Фильтрация во время чтения
for chunk in pd.read_csv('data.csv', chunksize=10000):
    filtered = chunk[chunk['age'] > 18]
    process(filtered)

# Конвертирование типов для экономии памяти
for chunk in pd.read_csv(
    'data.csv',
    chunksize=10000,
    dtype={'id': 'int32', 'status': 'category'}
):
    process(chunk)

Плюсы: Мощная обработка данных, оптимизация типов Минусы: Зависимость, overhead для простых операций

5. Dask — распределенная обработка

Для действительно больших данных (масштабирование за границы оперативной памяти):

import dask.dataframe as dd

# Чтение большого CSV с автоматическим разделением
df = dd.read_csv('huge_file.csv')

# Вычисления отложены до compute()
result = df[df['age'] > 18].groupby('city')['amount'].sum()
final = result.compute()  # Выполнение только здесь

# Обработка в параллель
result = df.map_partitions(lambda df: df * 2).compute()

# Пример: обработка файла 100 GB
def process_huge_dataset():
    df = dd.read_csv('data_*.csv')  # Поддерживает wildcards
    
    # Лениво применяем трансформации
    df['price_usd'] = df['price'] * df['usd_rate']
    
    # Агрегация в параллель
    result = df.groupby('category')['price_usd'].sum()
    
    # Вычисляем в конце
    return result.compute().to_dict()

results = process_huge_dataset()

Плюсы: Параллельная обработка, масштабируемость Минусы: Overhead, сложнее отладка

6. Memory Mapping (mmap) для бинарных файлов

Целое чтение больших файлов, но как если бы они были в памяти:

import mmap
import os

# Чтение бинарного файла через mmap
with open('large_binary_file.bin', 'r+b') as f:
    with mmap.mmap(f.fileno(), 0) as mmapped:
        # Работаем с файлом как с байт-массивом
        chunk = mmapped[0:1024]  # Первые 1024 байта
        
        # Поиск паттерна
        index = mmapped.find(b'pattern')
        
        # Замена
        mmapped[100:105] = b'hello'
        
        # Итерирование
        for i in range(0, len(mmapped), 4096):
            data = mmapped[i:i+4096]
            process_chunk(data)

# Практический пример: обработка огромного JSON
def parse_large_json_stream(filepath):
    with open(filepath, 'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped:
            # Поиск объектов без загрузки всего
            index = 0
            while True:
                start = mmapped.find(b'{', index)
                if start == -1:
                    break
                end = mmapped.find(b'}', start)
                if end == -1:
                    break
                json_str = mmapped[start:end+1].decode()
                yield json.loads(json_str)
                index = end + 1

Плюсы: Эффективно для бинарных файлов, быстрый поиск Минусы: Работает только с бинарными режимами

7. Streaming с requests (для удаленных файлов)

Для больших файлов по интернету:

import requests

# Скачивание больших файлов
def download_large_file(url, filepath):
    response = requests.get(url, stream=True)
    chunk_size = 8192
    total_size = 0
    
    with open(filepath, 'wb') as f:
        for chunk in response.iter_content(chunk_size=chunk_size):
            if chunk:
                f.write(chunk)
                total_size += len(chunk)
                print(f"Скачано: {total_size} байт")

download_large_file('https://example.com/large.zip', 'local_file.zip')

# Обработка на лету
def process_remote_csv(url):
    response = requests.get(url, stream=True)
    lines = response.iter_lines(decode_unicode=True)
    
    for line in lines:
        if line:
            process_line(line)

process_remote_csv('https://example.com/data.csv')

# С прогресс-баром
from tqdm import tqdm

def download_with_progress(url, filepath):
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    
    with open(filepath, 'wb') as f, \
         tqdm(total=total_size, unit='B', unit_scale=True) as pbar:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)
                pbar.update(len(chunk))

Плюсы: Экономит память, поддержка resume Минусы: Зависит от скорости сети

8. Обработка больших ZIP архивов

Без полной распаковки:

import zipfile

# Чтение файла прямо из архива
def process_zip_contents(zippath):
    with zipfile.ZipFile(zippath, 'r') as zf:
        for filename in zf.namelist():
            with zf.open(filename) as f:
                # Обработка файла из архива
                for line in f:
                    process_line(line.decode('utf-8'))

process_zip_contents('data.zip')

# Извлечение с потоковой обработкой
def extract_and_process(zippath, output_dir):
    with zipfile.ZipFile(zippath, 'r') as zf:
        for info in zf.infolist():
            if not info.is_dir():
                # Извлекаем по одному файлу
                zf.extract(info, output_dir)
                filepath = os.path.join(output_dir, info.filename)
                process_file(filepath)  # Обработка
                os.remove(filepath)  # Удаляем после обработки

extract_and_process('large.zip', '/tmp')

Плюсы: Эффективно для сжатых файлов Минусы: Может быть медленнее из-за распаковки

9. SQLite для больших данных

Если нужна индексация и запросы:

import sqlite3

# Импорт CSV в SQLite
def csv_to_sqlite(csv_file, db_file):
    conn = sqlite3.connect(db_file)
    cursor = conn.cursor()
    
    # Создание таблицы
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS data (
            id INTEGER PRIMARY KEY,
            name TEXT,
            amount REAL
        )
    ''')
    
    # Вставка данных партиями
    import csv
    with open(csv_file, 'r') as f:
        reader = csv.DictReader(f)
        batch = []
        for row in reader:
            batch.append((row['id'], row['name'], float(row['amount'])))
            if len(batch) >= 10000:
                cursor.executemany(
                    'INSERT INTO data VALUES (?, ?, ?)',
                    batch
                )
                conn.commit()
                batch = []
        
        if batch:
            cursor.executemany('INSERT INTO data VALUES (?, ?, ?)', batch)
            conn.commit()
    
    # Создание индекса
    cursor.execute('CREATE INDEX idx_name ON data(name)')
    conn.commit()
    conn.close()

# Запросы к большим данным
def query_large_data(db_file, amount_threshold):
    conn = sqlite3.connect(db_file)
    cursor = conn.cursor()
    
    # Используем курсор для потокового чтения
    cursor.execute(
        'SELECT name, amount FROM data WHERE amount > ? ORDER BY amount DESC',
        (amount_threshold,)
    )
    
    batch_size = 1000
    while True:
        rows = cursor.fetchmany(batch_size)
        if not rows:
            break
        process_rows(rows)
    
    conn.close()

Плюсы: Быстрые запросы, индексация Минусы: Нужна подготовка данных

10. Сравнение подходов

ПодходРазмер файлаСкоростьПамятьСложность
read() chunks< 1 GBХорошоОтличнаяНизкая
Line iterator< 1 GBОчень хорошоОтличнаяНизкая
Generators< 5 GBОтличнаяОтличнаяСредняя
Pandas chunking< 10 GBХорошоХорошаяСредняя
Dask> 10 GBОтличнаяОтличнаяВысокая
Memory mappingЛюбой размерОтличнаяОтличнаяСредняя
SQLite< 100 GBОтличная (с индексами)ХорошаяСредняя

11. Полный пример: обработка 5GB логов

def analyze_huge_log_file(filepath):
    stats = {
        'total_lines': 0,
        'errors': 0,
        'warnings': 0,
        'errors_by_module': {},
    }
    
    # Генератор для обработки по частям
    def read_log_lines():
        with open(filepath, 'r') as f:
            for line in f:
                yield line.strip()
    
    # Обработка
    for line in read_log_lines():
        stats['total_lines'] += 1
        
        if 'ERROR' in line:
            stats['errors'] += 1
            # Извлечение модуля
            try:
                module = line.split('[')[1].split(']')[0]
                stats['errors_by_module'][module] = \
                    stats['errors_by_module'].get(module, 0) + 1
            except:
                pass
        elif 'WARNING' in line:
            stats['warnings'] += 1
        
        # Периодический вывод прогресса
        if stats['total_lines'] % 1000000 == 0:
            print(f"Обработано {stats['total_lines']} строк")
    
    return stats

stats = analyze_huge_log_file('/var/log/app.log')
print(f"Всего строк: {stats['total_lines']}")
print(f"Ошибок: {stats['errors']}")
print(f"Ошибок по модулям: {stats['errors_by_module']}")

Итог: Для больших файлов используй:

  1. < 1 GB: Line iterator или chunking
  2. 1-10 GB: Generators или Pandas chunks
  3. 10-100 GB: Dask или SQLite
  4. > 100 GB: Распределенные системы (Spark, Hadoop) Никогда не загружай весь файл в память одной операцией!
Какие знаешь варианты работы с большим файлом в Python? | PrepBro