← Назад к вопросам
Есть ли у тебя опыт в разработке real-time приложений или работе со streaming?
1.2 Junior🔥 111 комментариев
#Асинхронность и многопоточность#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Опыт разработки real-time приложений и streaming
Да, у меня есть значительный опыт в разработке real-time приложений и работе с потоковыми данными. Рассмотрю основные области и технологии, которые я использовал.
1. WebSocket для real-time коммуникации
Я активно работал с WebSocket для двусторонней коммуникации в реальном времени:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
"""Отправить сообщение всем подключённым клиентам"""
for connection in self.active_connections:
try:
await connection.send_json(message)
except Exception as e:
print(f"Ошибка при отправке: {e}")
manager = ConnectionManager()
@app.websocket("/ws/notifications")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# Получаем данные от клиента
data = await websocket.receive_json()
# Обрабатываем и отправляем ответ всем
response = {
"type": "message",
"content": data.get("message"),
"timestamp": datetime.now().isoformat()
}
await manager.broadcast(response)
except WebSocketDisconnect:
manager.disconnect(websocket)
2. Streaming HTTP ответов (Server-Sent Events)
Для push-уведомлений использовал Server-Sent Events (SSE):
from fastapi.responses import StreamingResponse
import asyncio
@app.get("/stream/data")
async def stream_data():
async def generate():
for i in range(100):
# Отправляем данные по одному
yield f"data: {{\"value\": {i}, \"timestamp\": \"{datetime.now().isoformat()}\"}}
"
await asyncio.sleep(1) # Интервал между сообщениями
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
# Frontend
"""<script>
const eventSource = new EventSource('/stream/data');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Получены данные:', data);
};
</script>"""
3. Apache Kafka для потоковой обработки
Рабочий с Kafka в высоконагруженных системах:
from kafka import KafkaProducer, KafkaConsumer
import json
import threading
from typing import Callable
class KafkaStreamProcessor:
def __init__(self, bootstrap_servers: list[str]):
self.bootstrap_servers = bootstrap_servers
def produce_events(self, topic: str, events: list[dict]):
"""Отправить события в Kafka"""
producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for event in events:
producer.send(topic, value=event)
producer.flush()
producer.close()
def consume_stream(
self,
topic: str,
process_fn: Callable,
group_id: str = "app-group"
):
"""Потреблять события из Kafka"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True
)
try:
for message in consumer:
# Обработка события
process_fn(message.value)
finally:
consumer.close()
# Использование
kafka = KafkaStreamProcessor(['localhost:9092'])
# Отправка
events = [
{"user_id": 1, "action": "login"},
{"user_id": 2, "action": "purchase"},
]
kafka.produce_events("user-events", events)
# Потребление
def process_event(event):
print(f"Обработано событие: {event}")
# Здесь может быть сложная логика обработки
kafka.consume_stream(
"user-events",
process_event,
group_id="analytics-group"
)
4. Redis Pub/Sub для real-time уведомлений
import redis
import threading
from typing import Callable
class RedisRealtimeNotifications:
def __init__(self, redis_host: str = 'localhost', port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=port, decode_responses=True)
self.pubsub = self.redis_client.pubsub()
def publish_event(self, channel: str, message: dict):
"""Опубликовать событие"""
import json
self.redis_client.publish(channel, json.dumps(message))
def subscribe_to_events(
self,
channels: list[str],
callback: Callable
):
"""Подписаться на события в отдельном потоке"""
self.pubsub.subscribe(channels)
def listen():
import json
for message in self.pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
callback(message['channel'], data)
except Exception as e:
print(f"Ошибка обработки: {e}")
listener_thread = threading.Thread(target=listen, daemon=True)
listener_thread.start()
# Использование в FastAPI
from fastapi import FastAPI
app = FastAPI()
notifications = RedisRealtimeNotifications()
@app.on_event("startup")
async def startup():
# Подписываемся на события при запуске
def handle_notification(channel: str, message: dict):
print(f"Канал {channel}: {message}")
notifications.subscribe_to_events(
["orders", "payments", "notifications"],
handle_notification
)
@app.post("/order")
async def create_order(order_data: dict):
# Создаём заказ
order = {"id": 123, "status": "created", **order_data}
# Публикуем событие
notifications.publish_event("orders", order)
return order
5. Async/await для concurrent обработки
Использовал asyncio для обработки множественных concurrent операций:
import asyncio
from datetime import datetime
class RealtimeDataProcessor:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.processed_count = 0
async def process_single_stream(self, stream_id: int):
"""Обработать один поток данных"""
async with self.semaphore:
try:
while True:
# Имитация получения данных в real-time
data = await self.fetch_stream_data(stream_id)
await self.process_data(data)
self.processed_count += 1
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print(f"Stream {stream_id} остановлен")
async def fetch_stream_data(self, stream_id: int):
"""Получить данные из потока"""
await asyncio.sleep(0.01) # Имитация сетевой задержки
return {
"stream_id": stream_id,
"timestamp": datetime.now().isoformat(),
"value": 42
}
async def process_data(self, data: dict):
"""Обработать данные"""
await asyncio.sleep(0.05) # Обработка
print(f"Обработано: {data}")
async def run(self, num_streams: int = 5, duration_seconds: int = 60):
"""Запустить обработку нескольких потоков"""
# Создаём задачи для каждого потока
tasks = [
asyncio.create_task(self.process_single_stream(i))
for i in range(num_streams)
]
# Ждём N секунд, потом отменяем
await asyncio.sleep(duration_seconds)
for task in tasks:
task.cancel()
# Ждём завершения
await asyncio.gather(*tasks, return_exceptions=True)
print(f"Всего обработано: {self.processed_count}")
# Использование
processor = RealtimeDataProcessor(max_concurrent=10)
asyncio.run(processor.run(num_streams=10, duration_seconds=30))
6. WebRTC для real-time видео/аудио
Также работал с WebRTC для видео-конференций:
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiohttp import web
import json
pcs = set()
async def websocket_handler(request):
"""Обработчик WebSocket для WebRTC сигнализации"""
ws = web.WebSocketResponse()
await ws.prepare(request)
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("connection_state_change")
async def on_connection_state_change():
print(f"Состояние подключения: {pc.connection_state}")
if pc.connection_state == "closed":
pcs.discard(pc)
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
data = json.loads(msg.data)
if data["type"] == "offer":
# Получили предложение SDP
offer = RTCSessionDescription(
sdp=data["sdp"],
type=data["type"]
)
await pc.setRemoteDescription(offer)
# Отправляем ответ
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
await ws.send_json({
"type": "answer",
"sdp": pc.localDescription.sdp
})
return ws
app = web.Application()
app.router.add_get('/ws', websocket_handler)
7. Мониторинг и метрики real-time систем
Использовал Prometheus для сбора метрик:
from prometheus_client import Counter, Histogram, Gauge
import time
# Определяем метрики
processed_events = Counter(
'events_processed_total',
'Всего обработано событий',
['source']
)
processing_time = Histogram(
'event_processing_seconds',
'Время обработки события',
buckets=[0.01, 0.05, 0.1, 0.5, 1.0]
)
active_connections = Gauge(
'active_websocket_connections',
'Активные WebSocket соединения'
)
def process_event_with_metrics(event: dict, source: str):
start_time = time.time()
active_connections.inc()
try:
# Обработка
time.sleep(0.1)
processed_events.labels(source=source).inc()
finally:
duration = time.time() - start_time
processing_time.observe(duration)
active_connections.dec()
8. Шаблон для высоконагруженных real-time систем
from dataclasses import dataclass
from asyncio import Queue
import logging
@dataclass
class Event:
id: str
type: str
timestamp: str
data: dict
class RealtimeApplication:
def __init__(self, worker_count: int = 4):
self.event_queue: Queue[Event] = Queue()
self.worker_count = worker_count
self.logger = logging.getLogger(__name__)
async def enqueue_event(self, event: Event):
"""Добавить событие в очередь"""
await self.event_queue.put(event)
async def worker(self):
"""Рабочий процесс для обработки событий"""
while True:
try:
event = await asyncio.wait_for(
self.event_queue.get(),
timeout=30
)
await self.handle_event(event)
except asyncio.TimeoutError:
# Очередь пуста, можем делать cleanup
pass
async def handle_event(self, event: Event):
"""Обработать событие"""
self.logger.info(f"Обработка события: {event.id}")
# Специфичная логика обработки
async def run(self):
"""Запустить приложение"""
workers = [
asyncio.create_task(self.worker())
for _ in range(self.worker_count)
]
await asyncio.gather(*workers)
Выводы
У меня есть практический опыт с:
- WebSocket для двусторонней real-time коммуникации
- Server-Sent Events для push-уведомлений
- Apache Kafka для потоковой обработки данных
- Redis Pub/Sub для микросервисного взаимодействия
- Async/await для concurrent обработки множественных операций
- WebRTC для видео/аудио приложений
- Prometheus для мониторинга real-time систем
Это позволяет мне разрабатывать масштабируемые приложения, обрабатывающие тысячи событий в секунду.