← Назад к вопросам

Приведи пример задачи где бы использовал Kafka

1.3 Junior🔥 171 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Практический пример: E-Commerce Analytics Pipeline

Разберу реальную задачу, где Kafka необходима и даёт огромную ценность.

Контекст задачи

Представь крупный онлайн магазин:

  • 10k+ посещений в час
  • Множество микросервисов (Orders, Users, Inventory, Analytics, Notifications)
  • Нужно обрабатывать события в реальном времени
  • Требуется надёжность (данные не должны теряться)

Архитектура БЕЗ Kafka (проблемы)

# ❌ REST API вызовы (синхронные)
@router.post("/orders")
def create_order(order_data: OrderSchema):
    # Создаём заказ
    order = Order.create(**order_data)
    
    # 1. Синхронный вызов к Analytics
    response = requests.post("http://analytics/track", json={"order_id": order.id})
    if response.status_code != 200:
        # Провалился analytics, откатываем заказ?
        raise Exception("Analytics service down")
    
    # 2. Отправляем email
    response = requests.post("http://notification/email", json={"order_id": order.id})
    # Если email медленный, весь запрос зависает
    
    # 3. Обновляем инвентарь
    response = requests.post("http://inventory/update", json={"order_id": order.id})
    # Опять ждём
    
    return order

# Проблемы:
# ❌ Если упадёт Inventory, заказ не создастся
# ❌ Email может быть медленным, всем пользователям медленно
# ❌ Analytics сломана - не можем создавать заказы
# ❌ Нет гарантии доставки события
# ❌ Масштабируемость - каждый микросервис знает о других

Архитектура С Kafka (решение)

┌─────────────────┐
│  OrderService   │
│  (создание)     │
└────────┬────────┘
         │
         │ publishes "order.created"
         v
    ┌─────────┐
    │  Kafka  │
    │ Topics  │
    └────┬────┘
         │
    ┌────┴──────────┬─────────────┐
    │               │             │
    v               v             v
Analytics    Notification    Inventory
(consumer)   (consumer)      (consumer)

Шаг 1: OrderService публикует событие

from kafka import KafkaProducer
import json
from datetime import datetime
from uuid import uuid4

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@router.post("/orders")
def create_order(order_data: OrderSchema):
    # Создаём заказ в БД
    order = Order.create(**order_data)
    
    # Публикуем событие в Kafka
    event = {
        "event_type": "order.created",
        "order_id": str(order.id),
        "user_id": str(order.user_id),
        "amount": float(order.amount),
        "items": order.items,
        "timestamp": datetime.utcnow().isoformat(),
        "event_id": str(uuid4())
    }
    
    # Асинхронно отправляем
    producer.send('order-events', value=event)
    # ✅ Не ждём ответа! Заказ создан, событие в очереди
    
    return {"status": "created", "order_id": order.id}

Преимущества:

  • ✅ Заказ создаётся мгновенно (без ожидания)
  • ✅ Событие гарантированно доставлено в Kafka
  • ✅ Если Analytics упадёт, заказ всё равно создан

Шаг 2: Analytics потребляет события

from kafka import KafkaConsumer
import json
from pymongo import MongoClient

consumer = KafkaConsumer(
    'order-events',
    bootstrap_servers=['kafka:9092'],
    group_id='analytics-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

mongo = MongoClient('mongodb://mongo:27017')
db = mongo['analytics']

# Обрабатываем события
for message in consumer:
    event = message.value
    
    if event['event_type'] == 'order.created':
        # Сохраняем в MongoDB для анализа
        db.orders.insert_one({
            'order_id': event['order_id'],
            'amount': event['amount'],
            'user_id': event['user_id'],
            'created_at': event['timestamp'],
            'metrics': {
                'revenue': event['amount'],
                'items_count': len(event['items'])
            }
        })
        
        # Обновляем дашборды в реальном времени
        update_dashboard_metrics(event)
        
        print(f"Analytics: Заказ {event['order_id']} обработан")

Преимущества:

  • ✅ Analytics работает асинхронно
  • ✅ Может обработать в удобном темпе
  • ✅ Если упадёт, Kafka хранит события
  • ✅ Можно добавить новый Analytics consumer без изменения OrderService

Шаг 3: NotificationService потребляет события

from kafka import KafkaConsumer
from email.mime.text import MIMEText
import smtplib

consumer = KafkaConsumer(
    'order-events',
    bootstrap_servers=['kafka:9092'],
    group_id='notification-group',  # Другая consumer group!
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

for message in consumer:
    event = message.value
    
    if event['event_type'] == 'order.created':
        # Получаем данные пользователя
        user = User.get(event['user_id'])
        
        # Отправляем email
        subject = f"Order #{event['order_id']} created"
        body = f"""
        Thank you for your order!
        Amount: {event['amount']}
        Items: {len(event['items'])}
        """
        
        send_email(user.email, subject, body)
        
        # Отправляем SMS
        send_sms(user.phone, f"Order confirmed: {event['amount']}")
        
        # Отправляем push-notification
        send_push(user.device_id, "Your order is confirmed")
        
        print(f"Notification: User {event['user_id']} уведомлен")

Преимущества:

  • ✅ Email отправляется асинхронно
  • ✅ Можно добавить SMS и push отдельно
  • ✅ Если email сервер медленный, OrderService не страдает
  • ✅ Разные consumer groups могут обрабатывать одновременно

Шаг 4: InventoryService потребляет события

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'order-events',
    bootstrap_servers=['kafka:9092'],
    group_id='inventory-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

for message in consumer:
    event = message.value
    
    if event['event_type'] == 'order.created':
        # Уменьшаем количество на складе
        for item in event['items']:
            Product.get(item['product_id']).decrease_stock(item['quantity'])
        
        # Проверяем, нужна ли переупорядочение
        check_low_stock()
        
        # Публикуем новое событие!
        inventory_updated_event = {
            'event_type': 'inventory.updated',
            'order_id': event['order_id'],
            'timestamp': datetime.utcnow().isoformat()
        }
        producer.send('inventory-events', value=inventory_updated_event)
        
        print(f"Inventory: Stock updated for order {event['order_id']}")

Преимущества Kafka в этом примере

1. Слабая связанность

# OrderService не знает о Analytics, Notification, Inventory
# Если добавим новый сервис (например, Recommendation), просто добавим consumer
# OrderService меняться не будет!

2. Масштабируемость

Высокая нагрузка на Analytics?
→ Запустим 5 analytics consumers
→ Kafka распределит события между ними
→ Обработка ускорится в 5 раз

Одного сервиса на 10k заказов в час достаточно:
→ 10000 / 60 = 166 заказов в минуту
→ 1 потребитель может обработать 200-300 в секунду
→ Легко масштабируется

3. Надёжность

# Inventory сломался на 1 час?
# → Kafka хранит события
# → Когда Inventory вернулся, обработает все события из хранилища
# → Данные не потеряны

# Конфигурация retention:
Kafka сохраняет события 7 дней по умолчанию

4. Обработка в реальном времени

# Analytics обновляет дашборд в реальном времени
# Видим метрики как только заказ создан
# Не нужно ждать batch-обработки в конце дня

5. Дебаггинг и аудит

# Все события сохранены в Kafka
# Можем переиграть события для отладки
# Полная история всех операций

# Например, если email не отправился, можем:
# 1. Исправить баг в NotificationService
# 2. Переиграть события с самого начала
# 3. Все пользователи получат email

Тестирование

import pytest
from unittest.mock import patch, MagicMock

def test_order_created_event():
    """Тест что event попадает в Kafka"""
    with patch('kafka.KafkaProducer') as mock_producer:
        response = client.post("/orders", json={
            "user_id": "123",
            "items": [{"product_id": "1", "quantity": 2}],
            "amount": 100
        })
        
        assert response.status_code == 200
        
        # Проверяем что event был отправлен
        mock_producer.return_value.send.assert_called_once()
        call_args = mock_producer.return_value.send.call_args
        
        assert call_args[0][0] == 'order-events'  # Topic
        event = call_args[1]['value']
        assert event['event_type'] == 'order.created'
        assert event['amount'] == 100

Когда НЕ использовать Kafka

  • ❌ Простой монолит с 10 запросами в час
  • ❌ Требуется синхронный ответ (например, заказ должен быть сразу подтвержден)
  • ❌ Нет опыта с распределёнными системами
  • ❌ Небольшая команда (добавляет сложность)

Вывод

Kafka идеальна для:

  • Асинхронной обработки событий
  • Слабо связанных микросервисов
  • High-load систем (100k+ событий в час)
  • Систем требующих надёжность доставки

В примере с e-commerce, Kafka позволяет нам:

  1. Создавать заказы мгновенно
  2. Обрабатывать Analytics, уведомления, инвентарь независимо
  3. Масштабировать каждый сервис отдельно
  4. Гарантировать доставку событий
  5. Легко добавлять новые сервисы
Приведи пример задачи где бы использовал Kafka | PrepBro