← Назад к вопросам

Как распарсить миллион файлов, в каждом из которых миллион строк?

2.8 Senior🔥 41 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Обработка миллиона файлов по миллиону строк в каждом

Это классическая задача на обработку больших данных. Требуется стратегический подход, иначе можно столкнуться с проблемами памяти, производительности и I/O.

1. Оценка масштаба проблемы

# Грубая оценка размера данных
files_count = 1_000_000
lines_per_file = 1_000_000
bytes_per_line = 100  # Среднее значение

total_size_gb = (files_count * lines_per_file * bytes_per_line) / (1024**3)
print(f"Общий размер: {total_size_gb:.0f} GB")
# Примерно 100 ТБ данных

# Это НЕ может быть загружено в памяти на одной машине
# Нужна распределённая обработка

2. Стратегия 1: Потоковая обработка (Streaming)

Басовая идея: обрабатываем данные построчно, не загружая всё в память.

import os
from pathlib import Path

def process_single_file(filepath):
    """Обработка одного файла построчно"""
    results = []
    
    try:
        with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
            for line_num, line in enumerate(f, 1):
                # Обработка построчно
                processed = line.strip().split(',')
                
                # Периодический yield для экономии памяти
                if line_num % 10000 == 0:
                    yield results
                    results = []
                
                results.append(processed)
        
        if results:
            yield results
    
    except Exception as e:
        print(f"Ошибка при обработке {filepath}: {e}")

def process_all_files(directory):
    """Обработка всех файлов в директории"""
    for filepath in Path(directory).glob('*.csv'):
        for batch in process_single_file(filepath):
            # Отправить batch на обработку
            yield batch

# Использование
for batch in process_all_files('/data'):
    # Обработка batch'а (например, отправка в БД)
    save_to_database(batch)

3. Стратегия 2: Параллельная обработка (Multiprocessing)

Используем несколько процессов для обработки файлов одновременно.

import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path

def process_file_worker(filepath):
    """Worker для обработки одного файла"""
    aggregated = {}
    line_count = 0
    
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            line_count += 1
            data = parse_line(line)
            
            # Агрегирование в памяти для файла
            key = data['category']
            aggregated[key] = aggregated.get(key, 0) + data['value']
    
    return {
        'file': filepath,
        'lines': line_count,
        'data': aggregated
    }

def process_all_files_parallel(directory, num_workers=None):
    """Параллельная обработка всех файлов"""
    if num_workers is None:
        num_workers = multiprocessing.cpu_count()
    
    files = list(Path(directory).glob('*.csv'))
    
    results = []
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        # Отправляем файлы на обработку
        futures = [executor.submit(process_file_worker, f) for f in files]
        
        # Обработка результатов по мере их готовности
        for future in futures:
            try:
                result = future.result(timeout=300)  # 5 минут timeout
                results.append(result)
                
                # Периодически сохраняем результаты
                if len(results) % 1000 == 0:
                    save_results_batch(results)
                    results = []
            
            except Exception as e:
                print(f"Ошибка обработки файла: {e}")
    
    if results:
        save_results_batch(results)

4. Стратегия 3: Распределённая обработка (Spark/Dask)

Для действительно больших объёмов используем распределённые фреймворки.

# Вариант 1: Apache Spark через PySpark
from pyspark.sql import SparkSession

df = (SparkSession.builder
      .appName("process_large_files")
      .getOrCreate()
      .read
      .csv('/data/*.csv', header=True, inferSchema=True))

# Параллельная обработка
df_processed = (df
    .rdd
    .map(lambda row: process_row(row))
    .filter(lambda x: x is not None)
    .toDF())

df_processed.write.mode('overwrite').parquet('/output')

# Вариант 2: Dask для параллелизма
import dask.dataframe as dd

df = dd.read_csv('/data/*.csv')
result = df.map_partitions(
    lambda partition: partition.apply(lambda row: process_row(row), axis=1)
).compute()

result.to_csv('/output/result*.csv', index=False)

5. Оптимизация I/O

import io

def optimized_file_reading(filepath, buffer_size=1024*1024):
    """Оптимизированное чтение с большим буфером"""
    with open(filepath, 'r', buffering=buffer_size) as f:
        # Встроенный буфер системы значительно повышает скорость
        for line in f:
            yield line.strip()

# Использование mmap для очень больших файлов
import mmap

def read_with_mmap(filepath):
    """Чтение с использованием memory-mapped files"""
    with open(filepath, 'r') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:
            for line in iter(mmapped_file.readline, b''):
                yield line.decode('utf-8').strip()

6. Практический пример: ETL pipeline

from queue import Queue
from threading import Thread
import logging

class DataProcessingPipeline:
    def __init__(self, input_dir, output_dir, num_workers=4):
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.num_workers = num_workers
        self.queue = Queue(maxsize=100)  # Ограничиваем размер очереди
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def producer(self):
        """Поток, который читает файлы"""
        for filepath in Path(self.input_dir).glob('*.csv'):
            self.queue.put(filepath)
        
        # Сигнал завершения
        for _ in range(self.num_workers):
            self.queue.put(None)
    
    def worker(self):
        """Worker поток для обработки файлов"""
        while True:
            filepath = self.queue.get()
            
            if filepath is None:
                break
            
            try:
                self.process_file(filepath)
            except Exception as e:
                self.logger.error(f"Ошибка обработки {filepath}: {e}")
    
    def process_file(self, filepath):
        """Обработка одного файла"""
        output_file = Path(self.output_dir) / f"{filepath.stem}_processed.csv"
        
        with open(filepath, 'r') as f_in, open(output_file, 'w') as f_out:
            for line in f_in:
                processed = self.transform_line(line)
                if processed:
                    f_out.write(processed + '\n')
    
    def transform_line(self, line):
        """Трансформация одной строки"""
        # Логика обработки
        return line.strip().upper()
    
    def run(self):
        """Запуск pipeline"""
        # Запускаем producer
        producer_thread = Thread(target=self.producer, daemon=True)
        producer_thread.start()
        
        # Запускаем workers
        workers = []
        for _ in range(self.num_workers):
            worker_thread = Thread(target=self.worker, daemon=True)
            worker_thread.start()
            workers.append(worker_thread)
        
        # Ждём завершения
        producer_thread.join()
        for worker in workers:
            worker.join()
        
        self.logger.info("Обработка завершена")

# Использование
pipeline = DataProcessingPipeline('/input', '/output', num_workers=8)
pipeline.run()

7. Рекомендации по выбору стратегии

"""
Выбор стратегии в зависимости от масштаба:

1. 1000 файлов по 1M строк:
   - Простая потоковая обработка (Streaming)
   - Один сервер
   - Время: часы

2. 10K файлов по 1M строк:
   - Multiprocessing + Queue
   - Один сервер с несколькими ядрами
   - Время: часы

3. 100K файлов по 1M строк:
   - Dask распределённо или локально
   - Несколько серверов
   - Время: часы

4. 1M файлов по 1M строк (100 ТБ):
   - Apache Spark на кластере
   - 10+ серверов
   - Время: дни
   - Хранилище: S3/HDFS
"""

# Рекомендуемый стек для 1M файлов:
# 1. Хранение: S3 или HDFS
# 2. Обработка: Apache Spark или Google BigQuery
# 3. Управление: Airflow или Kubernetes
# 4. Мониторинг: Prometheus + Grafana

8. Ключевые оптимизации

  • Потоковая обработка: никогда не загружайте всё в памяти
  • Параллелизм: используйте многопроцессность для утилизации всех ядер
  • Батчинг: обрабатывайте данные батч'ами, а не строка за строкой
  • Кеширование: кешируйте результаты промежуточных операций
  • Асинхронность: используйте async I/O для сетевых операций
  • Сжатие: сжимайте данные для снижения I/O
  • Индексирование: используйте индексы для быстрого поиска

Заключение

Для обработки такого объёма данных нужна правильная архитектура:

  • На одной машине: streaming + multiprocessing
  • На кластере: Spark или Dask
  • В облаке: BigQuery, Redshift или похожие сервисы

Не забывайте про мониторинг, логирование и обработку ошибок при таком масштабе!