← Назад к вопросам
Как распарсить миллион файлов, в каждом из которых миллион строк?
2.8 Senior🔥 41 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Обработка миллиона файлов по миллиону строк в каждом
Это классическая задача на обработку больших данных. Требуется стратегический подход, иначе можно столкнуться с проблемами памяти, производительности и I/O.
1. Оценка масштаба проблемы
# Грубая оценка размера данных
files_count = 1_000_000
lines_per_file = 1_000_000
bytes_per_line = 100 # Среднее значение
total_size_gb = (files_count * lines_per_file * bytes_per_line) / (1024**3)
print(f"Общий размер: {total_size_gb:.0f} GB")
# Примерно 100 ТБ данных
# Это НЕ может быть загружено в памяти на одной машине
# Нужна распределённая обработка
2. Стратегия 1: Потоковая обработка (Streaming)
Басовая идея: обрабатываем данные построчно, не загружая всё в память.
import os
from pathlib import Path
def process_single_file(filepath):
"""Обработка одного файла построчно"""
results = []
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
# Обработка построчно
processed = line.strip().split(',')
# Периодический yield для экономии памяти
if line_num % 10000 == 0:
yield results
results = []
results.append(processed)
if results:
yield results
except Exception as e:
print(f"Ошибка при обработке {filepath}: {e}")
def process_all_files(directory):
"""Обработка всех файлов в директории"""
for filepath in Path(directory).glob('*.csv'):
for batch in process_single_file(filepath):
# Отправить batch на обработку
yield batch
# Использование
for batch in process_all_files('/data'):
# Обработка batch'а (например, отправка в БД)
save_to_database(batch)
3. Стратегия 2: Параллельная обработка (Multiprocessing)
Используем несколько процессов для обработки файлов одновременно.
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
def process_file_worker(filepath):
"""Worker для обработки одного файла"""
aggregated = {}
line_count = 0
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
line_count += 1
data = parse_line(line)
# Агрегирование в памяти для файла
key = data['category']
aggregated[key] = aggregated.get(key, 0) + data['value']
return {
'file': filepath,
'lines': line_count,
'data': aggregated
}
def process_all_files_parallel(directory, num_workers=None):
"""Параллельная обработка всех файлов"""
if num_workers is None:
num_workers = multiprocessing.cpu_count()
files = list(Path(directory).glob('*.csv'))
results = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Отправляем файлы на обработку
futures = [executor.submit(process_file_worker, f) for f in files]
# Обработка результатов по мере их готовности
for future in futures:
try:
result = future.result(timeout=300) # 5 минут timeout
results.append(result)
# Периодически сохраняем результаты
if len(results) % 1000 == 0:
save_results_batch(results)
results = []
except Exception as e:
print(f"Ошибка обработки файла: {e}")
if results:
save_results_batch(results)
4. Стратегия 3: Распределённая обработка (Spark/Dask)
Для действительно больших объёмов используем распределённые фреймворки.
# Вариант 1: Apache Spark через PySpark
from pyspark.sql import SparkSession
df = (SparkSession.builder
.appName("process_large_files")
.getOrCreate()
.read
.csv('/data/*.csv', header=True, inferSchema=True))
# Параллельная обработка
df_processed = (df
.rdd
.map(lambda row: process_row(row))
.filter(lambda x: x is not None)
.toDF())
df_processed.write.mode('overwrite').parquet('/output')
# Вариант 2: Dask для параллелизма
import dask.dataframe as dd
df = dd.read_csv('/data/*.csv')
result = df.map_partitions(
lambda partition: partition.apply(lambda row: process_row(row), axis=1)
).compute()
result.to_csv('/output/result*.csv', index=False)
5. Оптимизация I/O
import io
def optimized_file_reading(filepath, buffer_size=1024*1024):
"""Оптимизированное чтение с большим буфером"""
with open(filepath, 'r', buffering=buffer_size) as f:
# Встроенный буфер системы значительно повышает скорость
for line in f:
yield line.strip()
# Использование mmap для очень больших файлов
import mmap
def read_with_mmap(filepath):
"""Чтение с использованием memory-mapped files"""
with open(filepath, 'r') as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:
for line in iter(mmapped_file.readline, b''):
yield line.decode('utf-8').strip()
6. Практический пример: ETL pipeline
from queue import Queue
from threading import Thread
import logging
class DataProcessingPipeline:
def __init__(self, input_dir, output_dir, num_workers=4):
self.input_dir = input_dir
self.output_dir = output_dir
self.num_workers = num_workers
self.queue = Queue(maxsize=100) # Ограничиваем размер очереди
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def producer(self):
"""Поток, который читает файлы"""
for filepath in Path(self.input_dir).glob('*.csv'):
self.queue.put(filepath)
# Сигнал завершения
for _ in range(self.num_workers):
self.queue.put(None)
def worker(self):
"""Worker поток для обработки файлов"""
while True:
filepath = self.queue.get()
if filepath is None:
break
try:
self.process_file(filepath)
except Exception as e:
self.logger.error(f"Ошибка обработки {filepath}: {e}")
def process_file(self, filepath):
"""Обработка одного файла"""
output_file = Path(self.output_dir) / f"{filepath.stem}_processed.csv"
with open(filepath, 'r') as f_in, open(output_file, 'w') as f_out:
for line in f_in:
processed = self.transform_line(line)
if processed:
f_out.write(processed + '\n')
def transform_line(self, line):
"""Трансформация одной строки"""
# Логика обработки
return line.strip().upper()
def run(self):
"""Запуск pipeline"""
# Запускаем producer
producer_thread = Thread(target=self.producer, daemon=True)
producer_thread.start()
# Запускаем workers
workers = []
for _ in range(self.num_workers):
worker_thread = Thread(target=self.worker, daemon=True)
worker_thread.start()
workers.append(worker_thread)
# Ждём завершения
producer_thread.join()
for worker in workers:
worker.join()
self.logger.info("Обработка завершена")
# Использование
pipeline = DataProcessingPipeline('/input', '/output', num_workers=8)
pipeline.run()
7. Рекомендации по выбору стратегии
"""
Выбор стратегии в зависимости от масштаба:
1. 1000 файлов по 1M строк:
- Простая потоковая обработка (Streaming)
- Один сервер
- Время: часы
2. 10K файлов по 1M строк:
- Multiprocessing + Queue
- Один сервер с несколькими ядрами
- Время: часы
3. 100K файлов по 1M строк:
- Dask распределённо или локально
- Несколько серверов
- Время: часы
4. 1M файлов по 1M строк (100 ТБ):
- Apache Spark на кластере
- 10+ серверов
- Время: дни
- Хранилище: S3/HDFS
"""
# Рекомендуемый стек для 1M файлов:
# 1. Хранение: S3 или HDFS
# 2. Обработка: Apache Spark или Google BigQuery
# 3. Управление: Airflow или Kubernetes
# 4. Мониторинг: Prometheus + Grafana
8. Ключевые оптимизации
- Потоковая обработка: никогда не загружайте всё в памяти
- Параллелизм: используйте многопроцессность для утилизации всех ядер
- Батчинг: обрабатывайте данные батч'ами, а не строка за строкой
- Кеширование: кешируйте результаты промежуточных операций
- Асинхронность: используйте async I/O для сетевых операций
- Сжатие: сжимайте данные для снижения I/O
- Индексирование: используйте индексы для быстрого поиска
Заключение
Для обработки такого объёма данных нужна правильная архитектура:
- На одной машине: streaming + multiprocessing
- На кластере: Spark или Dask
- В облаке: BigQuery, Redshift или похожие сервисы
Не забывайте про мониторинг, логирование и обработку ошибок при таком масштабе!