Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Самый сложный проект — микросервис обработки видеопотоков
Я разработал систему для обработки видеопотоков в реальном времени на основе Python и асинхронных технологий.
Задача
Построить масштабируемую систему, которая:
- Принимает видеопотоки от множества источников (RTMP, HLS)
- Обрабатывает видео в реальном времени (детекция лиц, распознавание объектов)
- Отправляет результаты в базу данных
- Масштабируется горизонтально
Сложности и решения
1. Асинхронная обработка
Проблема: видеопотоки нельзя обрабатывать последовательно — пропускная способность была бы мала.
Решение: использовал FastAPI с asyncio, Redis для очереди задач, Celery для распределённой обработки.
from fastapi import FastAPI
from celery import Celery
import asyncio
app = FastAPI()
celery_app = Celery('video_processor')
@app.post('/process-stream')
async def process_stream(stream_url: str):
# Отправляем в очередь Celery
task = celery_app.send_task(
'tasks.process_video',
args=(stream_url,)
)
return {'task_id': task.id}
2. Управление памятью
Проблема: видеоархив требует большого объёма памяти. При обработке множества потоков падала память.
Решение: использовал контекстные менеджеры, объектные пулы, и streaming обработку вместо загрузки всего видео в память.
from contextlib import contextmanager
@contextmanager
def video_frame_loader(url, batch_size=32):
cap = cv2.VideoCapture(url)
try:
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
if len(frames) == batch_size:
yield frames
frames = []
finally:
cap.release()
3. Concurrency и Race Conditions
Проблема: множество воркеров писали в БД одновременно, возникали deadlocks и потеря данных.
Решение: использовал PostgreSQL с SELECT FOR UPDATE, асинхронные драйверы, и правильную изоляцию транзакций.
async def save_detection_result(frame_id: int, detections: list):
async with async_session() as session:
async with session.begin():
# Блокируем строку для избежания race conditions
result = await session.execute(
select(FrameResult).with_for_update()
.where(FrameResult.id == frame_id)
)
4. Масштабируемость и нагрузка
Проблема: при 50+ потоков система падала, было узкое место в обработке.
Решение: разделил на микросервисы:
- API gateway (FastAPI)
- Worker сервис (Celery)
- Stream manager (Redis)
- Results storage (PostgreSQL)
Каждый сервис масштабировался независимо.
5. Мониторинг и логирование
Проблема: при сбое было непонятно, на каком этапе произошла ошибка.
Решение: использовал Prometheus для метрик, ELK Stack для логов, distributed tracing с Jaeger.
from prometheus_client import Counter, Histogram
processing_time = Histogram('video_processing_seconds', 'Time to process video')
error_counter = Counter('processing_errors_total', 'Total processing errors')
@processing_time.time()
async def process_frame(frame):
try:
# обработка
except Exception as e:
error_counter.inc()
raise
Результаты
- Обрабатывал 100+ потоков одновременно
- Latency — 200-500ms на обработку
- Безошибочная работа 6 месяцев на продакшене
- Масштабировалась с 2 на 10 серверов без изменения кода
Этот проект научил меня работать с асинхронным кодом, микросервисной архитектурой, мониторингом и операционными сложностями реальных систем.