← Назад к вопросам

Как адаптировать модель данных под новые требования (например, реферальную программу)?

3.0 Senior🔥 101 комментариев
#ETL и качество данных#SQL и базы данных#Архитектура и проектирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Адаптация модели данных под реферальную программу

1. Анализ требований

Первый шаг — понять бизнес-логику реферальной программы:

  • Кто такой реферер? — пользователь, приглашающий других
  • Кто такой рефери? — новый пользователь, пришедший по ссылке реферера
  • Какие бонусы? — за регистрацию, первую покупку, определённую сумму
  • Сколько уровней? — одноуровневая (рефери даёт бонус реферу), многоуровневая (MLM)
  • Какие метрики? — count рефери, total earned, status (active/pending/rejected)

2. Базовая структура данных

Текущая модель (before):

CREATE TABLE users (
    id BIGINT PRIMARY KEY,
    email VARCHAR(255) UNIQUE,
    created_at TIMESTAMP,
    status VARCHAR(50)
);

Новая модель (after):

Добавляем таблицу referrals для отслеживания связей:

-- Таблица реферальных связей
CREATE TABLE referrals (
    id BIGINT PRIMARY KEY,
    referrer_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    referral_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    referral_code VARCHAR(50) NOT NULL UNIQUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status VARCHAR(50) DEFAULT 'pending',  -- pending, active, rejected
    bonus_amount DECIMAL(10, 2) DEFAULT 0,
    bonus_granted_at TIMESTAMP,
    UNIQUE (referrer_id, referral_id)
);

CREATE INDEX idx_referrer ON referrals(referrer_id);
CREATE INDEX idx_referral ON referrals(referral_id);
CREATE INDEX idx_code ON referrals(referral_code);

Денормализация для быстрого доступа к метрикам:

-- Кэш статистики по реферерам
CREATE TABLE referrer_stats (
    user_id BIGINT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
    total_referrals INT DEFAULT 0,
    active_referrals INT DEFAULT 0,
    total_earned DECIMAL(10, 2) DEFAULT 0,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

3. Логика прохождения рефери

Опишем state machine рефери:

from enum import Enum
from datetime import datetime

class ReferralStatus(Enum):
    PENDING = "pending"          # Рефери зарегистрировался
    QUALIFIED = "qualified"      # Выполнил условие (первая покупка)
    ACTIVE = "active"            # Бонус выдан
    REJECTED = "rejected"        # Не выполнил условие

class ReferralEvent:
    def __init__(self, referral_id, event_type, amount=None):
        self.referral_id = referral_id
        self.event_type = event_type  # 'signup', 'first_purchase', 'reject'
        self.amount = amount
        self.timestamp = datetime.utcnow()

4. ETL процесс обновления модели

Нужно миграция данных без downtime:

-- Шаг 1: Создать новые таблицы
CREATE TABLE referrals_new (
    id BIGINT PRIMARY KEY,
    referrer_id BIGINT NOT NULL,
    referral_id BIGINT NOT NULL,
    referral_code VARCHAR(50) NOT NULL UNIQUE,
    created_at TIMESTAMP,
    status VARCHAR(50),
    bonus_amount DECIMAL(10, 2),
    FOREIGN KEY (referrer_id) REFERENCES users(id),
    FOREIGN KEY (referral_id) REFERENCES users(id)
);

-- Шаг 2: Если есть старые данные, мигрируем
INSERT INTO referrals_new (id, referrer_id, referral_id, referral_code, created_at, status)
SELECT 
    ROW_NUMBER() OVER (ORDER BY referrer_id),
    referrer_id,
    referral_id,
    CONCAT('REF-', referral_id, '-', FLOOR(RANDOM() * 1000000)),
    CURRENT_TIMESTAMP,
    'pending'
FROM legacy_referrals;

-- Шаг 3: Переименовываем (во время maintenance window)
ALTER TABLE referrals RENAME TO referrals_old;
ALTER TABLE referrals_new RENAME TO referrals;

5. Расчёт метрик

Чтобы избежать медленных агрегирующих запросов, используем материализованное представление:

-- Материализованное представление (fast queries)
CREATE MATERIALIZED VIEW referrer_metrics AS
SELECT 
    r.referrer_id,
    COUNT(*) FILTER (WHERE r.status = 'active') as active_count,
    COUNT(*) FILTER (WHERE r.status = 'pending') as pending_count,
    SUM(r.bonus_amount) FILTER (WHERE r.status = 'active') as total_earned,
    MAX(r.bonus_granted_at) as last_bonus_date
FROM referrals r
GROUP BY r.referrer_id;

-- Индекс для быстрого доступа
CREATE INDEX idx_referrer_metrics ON referrer_metrics(referrer_id);

-- Обновление по расписанию (Cron job / Airflow)
REFRESH MATERIALIZED VIEW CONCURRENTLY referrer_metrics;

6. Pipeline обработки рефералов

Apache Airflow DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_pending_referrals():
    """Обработать pending статусы: проверить условия, выдать бонусы"""
    query = """
    UPDATE referrals r
    SET status = 'active', bonus_granted_at = NOW()
    WHERE r.status = 'pending'
        AND r.referral_id IN (
            SELECT user_id FROM users_with_first_purchase
            WHERE first_purchase_date > r.created_at
        )
    """
    execute_query(query)

def update_referrer_stats():
    """Обновить кэш статистики"""
    query = """
    REFRESH MATERIALIZED VIEW CONCURRENTLY referrer_metrics;
    """
    execute_query(query)

def handle_rejections():
    """Отклонить рефери, если прошло 30 дней и нет условий"""
    query = """
    UPDATE referrals
    SET status = 'rejected'
    WHERE status = 'pending'
        AND created_at < NOW() - INTERVAL 30 DAY
        AND referral_id NOT IN (SELECT user_id FROM paid_users);
    """
    execute_query(query)

with DAG('referral_processing', schedule_interval='@hourly') as dag:
    process_task = PythonOperator(
        task_id='process_pending',
        python_callable=process_pending_referrals
    )
    
    stats_task = PythonOperator(
        task_id='update_stats',
        python_callable=update_referrer_stats
    )
    
    reject_task = PythonOperator(
        task_id='handle_rejections',
        python_callable=handle_rejections
    )
    
    process_task >> stats_task >> reject_task

7. Качество данных

Добавляем проверки data quality:

from great_expectations.dataset import PandasDataset

def validate_referrals(df):
    assert df['bonus_amount'].notna().all(), "bonus_amount должен быть заполнен"
    assert (df['bonus_amount'] >= 0).all(), "bonus_amount не может быть отрицательным"
    assert df.duplicated(subset=['referrer_id', 'referral_id']).sum() == 0, "Нет дубликатов"
    assert (df['referrer_id'] != df['referral_id']).all(), "Реферер != рефери"
    print("✓ Все проверки пройдены")

8. Мониторинг

Ключевые метрики для дашборда:

  • Daily активные рефери
  • Conversion rate (рефери → активный)
  • Average bonus per referrer
  • Top 100 referrers
  • Failed/Rejected referrals count

Ошибки, которых избежать

  1. Не денормализуй без причины — referrer_stats нужна, потому что COUNT(*) на миллионах строк медленная
  2. Не забывай about cascading deletes — если удалить пользователя, referrals должны удалиться или учитывать NULL
  3. Не выполняй UPDATE для миллионов строк синхронно — это заблокирует БД
  4. Не доверяй только БД-логике — приложение должно валидировать бизнес-правила

Заключение

Адаптация модели данных — это итеративный процесс: сначала понимаешь требования, потом проектируешь schema, затем мигрируешь данные без downtime, и наконец автоматизируешь обработку через pipelines с мониторингом качества.