Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Проблемы при доставке данных (Data Delivery Issues)
Data delivery — это критичный процесс в data engineering, где данные передаются от источников к целевым системам (data warehouse, озёра данных, аналитикам). Множество проблем могут привести к потере, задержке или нарушению целостности данных.
1. Проблемы целостности данных
Дублирование данных
-- Проблема: при retry могут загрузиться дубли
-- Решение: использовать UNIQUE constraint и idempotency
CREATE TABLE events_deduplicated (
event_id UUID PRIMARY KEY,
user_id UUID,
event_type VARCHAR(50),
timestamp TIMESTAMP,
data JSONB
);
-- Или через ON CONFLICT
INSERT INTO events_deduplicated (event_id, user_id, event_type, timestamp, data)
VALUES (:event_id, :user_id, :event_type, :timestamp, :data)
ON CONFLICT (event_id) DO NOTHING; -- пропускаем дубли
-- Обнаружение дублей
SELECT user_id, COUNT(*) as cnt
FROM events_deduplicated
GROUP BY user_id
HAVING COUNT(*) > 1;
Потеря данных
# Потеря может произойти на разных этапах
# 1. На источнике (API rate limit, crash)
# 2. При передаче (network failure)
# 3. При обработке (исключение в коде)
# 4. При сохранении (BD constraint)
def safe_data_delivery(source_query, target_table):
"""
Безопасная доставка с retry и логированием
"""
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def extract_and_load():
try:
# Читаем данные
rows = db_source.execute(source_query)
row_count = 0
for row in rows:
try:
# Вставляем с обработкой ошибок
db_target.insert(target_table, row)
row_count += 1
except Exception as e:
logger.error(f"Failed to insert row: {row}, error: {e}")
raise # перепробовать целый batch
logger.info(f"Successfully loaded {row_count} rows")
return row_count
except Exception as e:
logger.error(f"Data delivery failed: {e}")
raise
return extract_and_load()
2. Проблемы производительности и задержки
Медленные запросы на источнике
# Проблема: большой SELECT может заблокировать production БД
# Решение: используй LIMIT, батчинг и чтение реплики
def extract_in_batches(source_table, batch_size=10000):
"""
Извлечение больших объёмов батчами
"""
offset = 0
while True:
query = f"""
SELECT * FROM {source_table}
ORDER BY id
LIMIT {batch_size}
OFFSET {offset}
"""
# Читаем с read replica, а не с primary
batch = read_replica.execute(query)
if not batch:
break
yield batch
offset += batch_size
# Небольшая задержка чтобы не нагружать replicas
time.sleep(0.1)
# Использование
for batch in extract_in_batches('large_table'):
load_to_warehouse(batch)
Сетевые задержки
# Проблема: медленный интернет, timeout'ы
# Решение: connection pooling, compression, parallel requests
import requests
from concurrent.futures import ThreadPoolExecutor
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_resilient_session():
"""
Сессия с retry и connection pooling
"""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1, # 1s, 2s, 4s
status_forcelist=[429, 500, 502, 503, 504], # retry на эти ошибки
method_whitelist=["HEAD", "GET", "OPTIONS"] # safe методы
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # connection pool
pool_maxsize=10
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# Параллельные запросы к API
def parallel_api_calls(urls, max_workers=5):
session = create_resilient_session()
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(session.get, url) for url in urls]
for future in futures:
try:
response = future.result(timeout=30)
results.append(response.json())
except Exception as e:
logger.error(f"API call failed: {e}")
return results
3. Проблемы схемы и несовместимости
Несовместимость типов данных
-- Проблема: JSON данные с непредсказуемой структурой
-- Решение: валидация и преобразование
CREATE TABLE raw_events (
event_id UUID,
raw_data JSONB
);
-- Проверяем структуру перед вставкой
CREATE TABLE validated_events AS
SELECT
event_id,
raw_data->>'event_type' as event_type,
(raw_data->>'timestamp')::TIMESTAMP as timestamp,
COALESCE((raw_data->>'user_id'), 'unknown') as user_id
FROM raw_events
WHERE
-- Валидируем структуру
raw_data ? 'event_type'
AND raw_data ? 'timestamp'
AND raw_data->>'timestamp' ~ '^\d{4}-\d{2}-\d{2}';
-- Проблемные записи отдельно
CREATE TABLE invalid_events AS
SELECT * FROM raw_events
WHERE NOT (raw_data ? 'event_type' AND raw_data ? 'timestamp');
Schema Evolution (развитие схемы)
# Проблема: источник добавил новые поля, как это отловить?
# Решение: мониторинг схемы
def detect_schema_changes(old_schema, new_data_sample):
"""
Обнаруживает изменения в схеме
"""
import json
from jsonschema import Draft7Validator
# Генерируем схему из выборки
new_schema = infer_schema_from_sample(new_data_sample)
# Проверяем изменения
old_fields = set(old_schema.keys())
new_fields = set(new_schema.keys())
added = new_fields - old_fields
removed = old_fields - new_fields
if added or removed:
logger.warning(f"Schema change detected: added={added}, removed={removed}")
send_alert(f"Table schema changed: {added}, {removed}")
# Можем выбрать стратегию
if added:
# Добавляем новые колонки как NULL (backward compatible)
for col in added:
db.execute(f"ALTER TABLE target ADD COLUMN {col} VARCHAR(255)")
if removed:
# Удалённые колонки могут быть проблемой
logger.error(f"Columns were removed: {removed} - need manual review")
return added, removed
4. Проблемы мониторинга и наблюдаемости
Отсутствие fail-fast обнаружения
# Пример: мониторим доставку данных
from datetime import datetime, timedelta
def monitor_data_freshness(table_name, max_age_hours=24):
"""
Проверяем, не застряла ли доставка
"""
query = f"""
SELECT MAX(loaded_at) as last_load
FROM {table_name}
"""
last_load = db.execute(query).scalar()
if not last_load:
send_alert(f"{table_name} has no data at all!")
return False
age = datetime.now() - last_load
if age > timedelta(hours=max_age_hours):
send_alert(
f"{table_name} not updated for {age.total_seconds() / 3600} hours. "
f"Last load: {last_load}"
)
return False
return True
# Мониторим row count
def monitor_row_count_anomaly(table_name):
"""
Обнаруживаем аномальное количество данных
"""
query = f"SELECT COUNT(*) as cnt FROM {table_name}"
current_count = db.execute(query).scalar()
# Сравниваем с историческим среднем
historical_avg = get_historical_avg_count(table_name, days=30)
deviation = abs(current_count - historical_avg) / historical_avg * 100
if deviation > 50: # > 50% отклонение
send_alert(
f"{table_name} row count anomaly: {current_count} rows "
f"(expected ~{historical_avg}, deviation {deviation:.1f}%)"
)
5. Проблемы упорядочения и consistency
Out-of-order данные
-- Проблема: данные приходят не в том порядке
-- Например, событие за 10:00 приходит в 10:30
CREATE TABLE events_ordered AS
SELECT
event_id,
user_id,
event_time,
received_at,
EXTRACT(EPOCH FROM (received_at - event_time)) as delivery_lag_seconds
FROM raw_events
ORDER BY event_time; -- не по received_at!
-- Мониторим backlog задержек
SELECT
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY delivery_lag_seconds) as p50_lag,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY delivery_lag_seconds) as p95_lag,
MAX(delivery_lag_seconds) as max_lag
FROM events_ordered
WHERE received_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours';
6. Частые причины и решения
| Проблема | Причина | Решение |
|---|---|---|
| Дубли | Retry без idempotency | Unique key + ON CONFLICT DO NOTHING |
| Потеря данных | Crash при обработке | Checkpoint + именность всех операций |
| Задержки | Медленный источник | Батчинг, read replica, параллелизм |
| Некорректные данные | Валидация не соответствует | Schema validation, DQ checks |
| Невидимые ошибки | Плохой мониторинг | Алерты на freshness, row count, lag |
| Schema drift | Source меняется | Automatic schema detection + alert |
Best Practices
- Идемпотентность: все операции должны быть безопасны при retry
- Мониторинг: alerting на freshness, completeness, latency
- DQ checks: валидируй данные на каждом шаге
- Retry с backoff: экспоненциальная задержка, не бесконечный retry
- Circuit breaker: если источник всегда падает, отключи его
- Incremental load: не грузи всё подряд, используй watermarks
- Testing: unit тесты для ETL логики, интеграционные для flow'ов
- Документация: какие трансформации применяются, какие constraint'ы
- SLA: определи обещание на latency и completeness
- Disaster recovery: план восстановления при потере целых дней данных