← Назад к вопросам

Какие знаешь неизменяемые типы данных?

1.0 Junior🔥 241 комментариев
#Python

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Проблемы при доставке данных (Data Delivery Issues)

Data delivery — это критичный процесс в data engineering, где данные передаются от источников к целевым системам (data warehouse, озёра данных, аналитикам). Множество проблем могут привести к потере, задержке или нарушению целостности данных.

1. Проблемы целостности данных

Дублирование данных

-- Проблема: при retry могут загрузиться дубли
-- Решение: использовать UNIQUE constraint и idempotency

CREATE TABLE events_deduplicated (
    event_id UUID PRIMARY KEY,
    user_id UUID,
    event_type VARCHAR(50),
    timestamp TIMESTAMP,
    data JSONB
);

-- Или через ON CONFLICT
INSERT INTO events_deduplicated (event_id, user_id, event_type, timestamp, data)
VALUES (:event_id, :user_id, :event_type, :timestamp, :data)
ON CONFLICT (event_id) DO NOTHING;  -- пропускаем дубли

-- Обнаружение дублей
SELECT user_id, COUNT(*) as cnt
FROM events_deduplicated
GROUP BY user_id
HAVING COUNT(*) > 1;

Потеря данных

# Потеря может произойти на разных этапах
# 1. На источнике (API rate limit, crash)
# 2. При передаче (network failure)
# 3. При обработке (исключение в коде)
# 4. При сохранении (BD constraint)

def safe_data_delivery(source_query, target_table):
    """
    Безопасная доставка с retry и логированием
    """
    from tenacity import retry, stop_after_attempt, wait_exponential
    import logging
    
    logger = logging.getLogger(__name__)
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    def extract_and_load():
        try:
            # Читаем данные
            rows = db_source.execute(source_query)
            row_count = 0
            
            for row in rows:
                try:
                    # Вставляем с обработкой ошибок
                    db_target.insert(target_table, row)
                    row_count += 1
                except Exception as e:
                    logger.error(f"Failed to insert row: {row}, error: {e}")
                    raise  # перепробовать целый batch
            
            logger.info(f"Successfully loaded {row_count} rows")
            return row_count
        
        except Exception as e:
            logger.error(f"Data delivery failed: {e}")
            raise
    
    return extract_and_load()

2. Проблемы производительности и задержки

Медленные запросы на источнике

# Проблема: большой SELECT может заблокировать production БД
# Решение: используй LIMIT, батчинг и чтение реплики

def extract_in_batches(source_table, batch_size=10000):
    """
    Извлечение больших объёмов батчами
    """
    offset = 0
    
    while True:
        query = f"""
        SELECT * FROM {source_table}
        ORDER BY id
        LIMIT {batch_size}
        OFFSET {offset}
        """
        
        # Читаем с read replica, а не с primary
        batch = read_replica.execute(query)
        
        if not batch:
            break
        
        yield batch
        offset += batch_size
        
        # Небольшая задержка чтобы не нагружать replicas
        time.sleep(0.1)

# Использование
for batch in extract_in_batches('large_table'):
    load_to_warehouse(batch)

Сетевые задержки

# Проблема: медленный интернет, timeout'ы
# Решение: connection pooling, compression, parallel requests

import requests
from concurrent.futures import ThreadPoolExecutor
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

def create_resilient_session():
    """
    Сессия с retry и connection pooling
    """
    session = requests.Session()
    
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,  # 1s, 2s, 4s
        status_forcelist=[429, 500, 502, 503, 504],  # retry на эти ошибки
        method_whitelist=["HEAD", "GET", "OPTIONS"]  # safe методы
    )
    
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,  # connection pool
        pool_maxsize=10
    )
    
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# Параллельные запросы к API
def parallel_api_calls(urls, max_workers=5):
    session = create_resilient_session()
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(session.get, url) for url in urls]
        for future in futures:
            try:
                response = future.result(timeout=30)
                results.append(response.json())
            except Exception as e:
                logger.error(f"API call failed: {e}")
    
    return results

3. Проблемы схемы и несовместимости

Несовместимость типов данных

-- Проблема: JSON данные с непредсказуемой структурой
-- Решение: валидация и преобразование

CREATE TABLE raw_events (
    event_id UUID,
    raw_data JSONB
);

-- Проверяем структуру перед вставкой
CREATE TABLE validated_events AS
SELECT 
    event_id,
    raw_data->>'event_type' as event_type,
    (raw_data->>'timestamp')::TIMESTAMP as timestamp,
    COALESCE((raw_data->>'user_id'), 'unknown') as user_id
FROM raw_events
WHERE
    -- Валидируем структуру
    raw_data ? 'event_type'
    AND raw_data ? 'timestamp'
    AND raw_data->>'timestamp' ~ '^\d{4}-\d{2}-\d{2}';

-- Проблемные записи отдельно
CREATE TABLE invalid_events AS
SELECT * FROM raw_events
WHERE NOT (raw_data ? 'event_type' AND raw_data ? 'timestamp');

Schema Evolution (развитие схемы)

# Проблема: источник добавил новые поля, как это отловить?
# Решение: мониторинг схемы

def detect_schema_changes(old_schema, new_data_sample):
    """
    Обнаруживает изменения в схеме
    """
    import json
    from jsonschema import Draft7Validator
    
    # Генерируем схему из выборки
    new_schema = infer_schema_from_sample(new_data_sample)
    
    # Проверяем изменения
    old_fields = set(old_schema.keys())
    new_fields = set(new_schema.keys())
    
    added = new_fields - old_fields
    removed = old_fields - new_fields
    
    if added or removed:
        logger.warning(f"Schema change detected: added={added}, removed={removed}")
        send_alert(f"Table schema changed: {added}, {removed}")
        
        # Можем выбрать стратегию
        if added:
            # Добавляем новые колонки как NULL (backward compatible)
            for col in added:
                db.execute(f"ALTER TABLE target ADD COLUMN {col} VARCHAR(255)")
        
        if removed:
            # Удалённые колонки могут быть проблемой
            logger.error(f"Columns were removed: {removed} - need manual review")
    
    return added, removed

4. Проблемы мониторинга и наблюдаемости

Отсутствие fail-fast обнаружения

# Пример: мониторим доставку данных
from datetime import datetime, timedelta

def monitor_data_freshness(table_name, max_age_hours=24):
    """
    Проверяем, не застряла ли доставка
    """
    query = f"""
    SELECT MAX(loaded_at) as last_load
    FROM {table_name}
    """
    
    last_load = db.execute(query).scalar()
    
    if not last_load:
        send_alert(f"{table_name} has no data at all!")
        return False
    
    age = datetime.now() - last_load
    
    if age > timedelta(hours=max_age_hours):
        send_alert(
            f"{table_name} not updated for {age.total_seconds() / 3600} hours. "
            f"Last load: {last_load}"
        )
        return False
    
    return True

# Мониторим row count
def monitor_row_count_anomaly(table_name):
    """
    Обнаруживаем аномальное количество данных
    """
    query = f"SELECT COUNT(*) as cnt FROM {table_name}"
    current_count = db.execute(query).scalar()
    
    # Сравниваем с историческим среднем
    historical_avg = get_historical_avg_count(table_name, days=30)
    deviation = abs(current_count - historical_avg) / historical_avg * 100
    
    if deviation > 50:  # > 50% отклонение
        send_alert(
            f"{table_name} row count anomaly: {current_count} rows "
            f"(expected ~{historical_avg}, deviation {deviation:.1f}%)"
        )

5. Проблемы упорядочения и consistency

Out-of-order данные

-- Проблема: данные приходят не в том порядке
-- Например, событие за 10:00 приходит в 10:30

CREATE TABLE events_ordered AS
SELECT 
    event_id,
    user_id,
    event_time,
    received_at,
    EXTRACT(EPOCH FROM (received_at - event_time)) as delivery_lag_seconds
FROM raw_events
ORDER BY event_time;  -- не по received_at!

-- Мониторим backlog задержек
SELECT 
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY delivery_lag_seconds) as p50_lag,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY delivery_lag_seconds) as p95_lag,
    MAX(delivery_lag_seconds) as max_lag
FROM events_ordered
WHERE received_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours';

6. Частые причины и решения

ПроблемаПричинаРешение
ДублиRetry без idempotencyUnique key + ON CONFLICT DO NOTHING
Потеря данныхCrash при обработкеCheckpoint + именность всех операций
ЗадержкиМедленный источникБатчинг, read replica, параллелизм
Некорректные данныеВалидация не соответствуетSchema validation, DQ checks
Невидимые ошибкиПлохой мониторингАлерты на freshness, row count, lag
Schema driftSource меняетсяAutomatic schema detection + alert

Best Practices

  • Идемпотентность: все операции должны быть безопасны при retry
  • Мониторинг: alerting на freshness, completeness, latency
  • DQ checks: валидируй данные на каждом шаге
  • Retry с backoff: экспоненциальная задержка, не бесконечный retry
  • Circuit breaker: если источник всегда падает, отключи его
  • Incremental load: не грузи всё подряд, используй watermarks
  • Testing: unit тесты для ETL логики, интеграционные для flow'ов
  • Документация: какие трансформации применяются, какие constraint'ы
  • SLA: определи обещание на latency и completeness
  • Disaster recovery: план восстановления при потере целых дней данных