← Назад к вопросам
Как адаптировать модель данных под новые требования (например, реферальную программу)?
3.0 Senior🔥 101 комментариев
#ETL и качество данных#SQL и базы данных#Архитектура и проектирование
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Адаптация модели данных под реферальную программу
1. Анализ требований
Первый шаг — понять бизнес-логику реферальной программы:
- Кто такой реферер? — пользователь, приглашающий других
- Кто такой рефери? — новый пользователь, пришедший по ссылке реферера
- Какие бонусы? — за регистрацию, первую покупку, определённую сумму
- Сколько уровней? — одноуровневая (рефери даёт бонус реферу), многоуровневая (MLM)
- Какие метрики? — count рефери, total earned, status (active/pending/rejected)
2. Базовая структура данных
Текущая модель (before):
CREATE TABLE users (
id BIGINT PRIMARY KEY,
email VARCHAR(255) UNIQUE,
created_at TIMESTAMP,
status VARCHAR(50)
);
Новая модель (after):
Добавляем таблицу referrals для отслеживания связей:
-- Таблица реферальных связей
CREATE TABLE referrals (
id BIGINT PRIMARY KEY,
referrer_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
referral_id BIGINT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
referral_code VARCHAR(50) NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(50) DEFAULT 'pending', -- pending, active, rejected
bonus_amount DECIMAL(10, 2) DEFAULT 0,
bonus_granted_at TIMESTAMP,
UNIQUE (referrer_id, referral_id)
);
CREATE INDEX idx_referrer ON referrals(referrer_id);
CREATE INDEX idx_referral ON referrals(referral_id);
CREATE INDEX idx_code ON referrals(referral_code);
Денормализация для быстрого доступа к метрикам:
-- Кэш статистики по реферерам
CREATE TABLE referrer_stats (
user_id BIGINT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
total_referrals INT DEFAULT 0,
active_referrals INT DEFAULT 0,
total_earned DECIMAL(10, 2) DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
3. Логика прохождения рефери
Опишем state machine рефери:
from enum import Enum
from datetime import datetime
class ReferralStatus(Enum):
PENDING = "pending" # Рефери зарегистрировался
QUALIFIED = "qualified" # Выполнил условие (первая покупка)
ACTIVE = "active" # Бонус выдан
REJECTED = "rejected" # Не выполнил условие
class ReferralEvent:
def __init__(self, referral_id, event_type, amount=None):
self.referral_id = referral_id
self.event_type = event_type # 'signup', 'first_purchase', 'reject'
self.amount = amount
self.timestamp = datetime.utcnow()
4. ETL процесс обновления модели
Нужно миграция данных без downtime:
-- Шаг 1: Создать новые таблицы
CREATE TABLE referrals_new (
id BIGINT PRIMARY KEY,
referrer_id BIGINT NOT NULL,
referral_id BIGINT NOT NULL,
referral_code VARCHAR(50) NOT NULL UNIQUE,
created_at TIMESTAMP,
status VARCHAR(50),
bonus_amount DECIMAL(10, 2),
FOREIGN KEY (referrer_id) REFERENCES users(id),
FOREIGN KEY (referral_id) REFERENCES users(id)
);
-- Шаг 2: Если есть старые данные, мигрируем
INSERT INTO referrals_new (id, referrer_id, referral_id, referral_code, created_at, status)
SELECT
ROW_NUMBER() OVER (ORDER BY referrer_id),
referrer_id,
referral_id,
CONCAT('REF-', referral_id, '-', FLOOR(RANDOM() * 1000000)),
CURRENT_TIMESTAMP,
'pending'
FROM legacy_referrals;
-- Шаг 3: Переименовываем (во время maintenance window)
ALTER TABLE referrals RENAME TO referrals_old;
ALTER TABLE referrals_new RENAME TO referrals;
5. Расчёт метрик
Чтобы избежать медленных агрегирующих запросов, используем материализованное представление:
-- Материализованное представление (fast queries)
CREATE MATERIALIZED VIEW referrer_metrics AS
SELECT
r.referrer_id,
COUNT(*) FILTER (WHERE r.status = 'active') as active_count,
COUNT(*) FILTER (WHERE r.status = 'pending') as pending_count,
SUM(r.bonus_amount) FILTER (WHERE r.status = 'active') as total_earned,
MAX(r.bonus_granted_at) as last_bonus_date
FROM referrals r
GROUP BY r.referrer_id;
-- Индекс для быстрого доступа
CREATE INDEX idx_referrer_metrics ON referrer_metrics(referrer_id);
-- Обновление по расписанию (Cron job / Airflow)
REFRESH MATERIALIZED VIEW CONCURRENTLY referrer_metrics;
6. Pipeline обработки рефералов
Apache Airflow DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_pending_referrals():
"""Обработать pending статусы: проверить условия, выдать бонусы"""
query = """
UPDATE referrals r
SET status = 'active', bonus_granted_at = NOW()
WHERE r.status = 'pending'
AND r.referral_id IN (
SELECT user_id FROM users_with_first_purchase
WHERE first_purchase_date > r.created_at
)
"""
execute_query(query)
def update_referrer_stats():
"""Обновить кэш статистики"""
query = """
REFRESH MATERIALIZED VIEW CONCURRENTLY referrer_metrics;
"""
execute_query(query)
def handle_rejections():
"""Отклонить рефери, если прошло 30 дней и нет условий"""
query = """
UPDATE referrals
SET status = 'rejected'
WHERE status = 'pending'
AND created_at < NOW() - INTERVAL 30 DAY
AND referral_id NOT IN (SELECT user_id FROM paid_users);
"""
execute_query(query)
with DAG('referral_processing', schedule_interval='@hourly') as dag:
process_task = PythonOperator(
task_id='process_pending',
python_callable=process_pending_referrals
)
stats_task = PythonOperator(
task_id='update_stats',
python_callable=update_referrer_stats
)
reject_task = PythonOperator(
task_id='handle_rejections',
python_callable=handle_rejections
)
process_task >> stats_task >> reject_task
7. Качество данных
Добавляем проверки data quality:
from great_expectations.dataset import PandasDataset
def validate_referrals(df):
assert df['bonus_amount'].notna().all(), "bonus_amount должен быть заполнен"
assert (df['bonus_amount'] >= 0).all(), "bonus_amount не может быть отрицательным"
assert df.duplicated(subset=['referrer_id', 'referral_id']).sum() == 0, "Нет дубликатов"
assert (df['referrer_id'] != df['referral_id']).all(), "Реферер != рефери"
print("✓ Все проверки пройдены")
8. Мониторинг
Ключевые метрики для дашборда:
- Daily активные рефери
- Conversion rate (рефери → активный)
- Average bonus per referrer
- Top 100 referrers
- Failed/Rejected referrals count
Ошибки, которых избежать
- ❌ Не денормализуй без причины — referrer_stats нужна, потому что COUNT(*) на миллионах строк медленная
- ❌ Не забывай about cascading deletes — если удалить пользователя, referrals должны удалиться или учитывать NULL
- ❌ Не выполняй UPDATE для миллионов строк синхронно — это заблокирует БД
- ❌ Не доверяй только БД-логике — приложение должно валидировать бизнес-правила
Заключение
Адаптация модели данных — это итеративный процесс: сначала понимаешь требования, потом проектируешь schema, затем мигрируешь данные без downtime, и наконец автоматизируешь обработку через pipelines с мониторингом качества.