В каком виде данные приходили в вашу систему
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Форматы и способы поступления данных в системы (мой опыт)
Больше всего я работал с разнообразными источниками данных. За 10+ лет встречал практически всё, от аккуратных API до полного хаоса в CSV файлах. Вот реальные примеры.
1. Real-time Streaming (Kafka, RabbitMQ, Kinesis)
Сценарий 1: E-commerce платформа
Данные о событиях пользователя приходили в реальном времени через Kafka:
{
"event_type": "page_view",
"user_id": "usr_123456",
"session_id": "sess_abc789",
"timestamp": "2024-03-23T10:15:30.123Z",
"page_url": "/products/laptop",
"device": "mobile",
"ip_address": "192.168.1.1",
"referrer": "google",
"properties": {
"product_id": "prod_999",
"category": "electronics"
}
}
Как обрабатывал:
from kafka import KafkaConsumer
import json
from datetime import datetime
consumer = KafkaConsumer(
'events',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
# Валидация
if not event.get('user_id') or not event.get('timestamp'):
log_error(f'Invalid event: {event}')
continue
# Обогащение
event['processed_at'] = datetime.utcnow().isoformat()
event['country'] = lookup_country_by_ip(event['ip_address'])
# Сохранение
save_to_clickhouse(event)
save_to_data_lake_s3(event)
Вызовы:
- Duplicate events — нужны идемпотентные операции
- Out-of-order events — temporal anomalies
- Schema evolution — какой-то events приходили со старой схемой
- Late arrivals — события приходили с задержкой часа
2. REST API (Polling)
Сценарий 2: CRM система (Salesforce)
Данные о сделках нужно было синхронизировать каждый час:
import requests
from datetime import datetime, timedelta
import time
class SalesforceExtractor:
def __init__(self, api_token):
self.base_url = 'https://instance.salesforce.com'
self.token = api_token
self.headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json'
}
def extract_opportunities(self):
"""Получить все сделки, обновлённые за последний час"""
# Incremental extraction — важно не грузить всё с начала
last_run = get_last_run_time() # Из БД
since = (datetime.utcnow() - timedelta(hours=1)).isoformat()
all_records = []
offset = 0
limit = 2000 # Salesforce limit
while True:
# SOQL query
query = f"""
SELECT Id, Name, Amount, StageName, CloseDate, AccountId, CreatedDate
FROM Opportunity
WHERE LastModifiedDate >= {since}Z
ORDER BY Id
OFFSET {offset}
LIMIT {limit}
"""
# URL encoding
url = f"{self.base_url}/services/data/v57.0/query"
params = {'q': query}
try:
response = requests.get(
url,
headers=self.headers,
params=params,
timeout=30
)
response.raise_for_status()
data = response.json()
all_records.extend(data['records'])
# Check pagination
if not data.get('nextRecordsUrl'):
break
offset += limit
# Rate limiting (Salesforce: 15 RPS для batch)
time.sleep(0.1)
except requests.exceptions.RequestException as e:
log_error(f'Salesforce API error: {e}')
# Retry logic
time.sleep(5)
continue
return all_records
def transform_and_load(self, records):
"""Transform и load into Data Warehouse"""
for record in records:
# Remove Salesforce metadata
opportunity = {
'sf_id': record['Id'],
'name': record['Name'],
'amount': float(record.get('Amount') or 0),
'stage': record['StageName'],
'close_date': record.get('CloseDate'),
'account_id': record.get('AccountId'),
'created_date': record['CreatedDate'],
'extracted_at': datetime.utcnow(),
}
# Upsert в PostgreSQL
upsert_opportunity(opportunity)
Вызовы:
- Rate limits — Salesforce позволяет 15 запросов в секунду
- Pagination — нужно обработать все батчи
- Schema changes — когда добавляют новые поля в Salesforce
- Deleted records — нужно синхронизировать deletions
3. File uploads (CSV, JSON, Parquet)
Сценарий 3: Маркетинговая компания
Клиенты загружали файлы вручную (ежемесячно):
import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path
import hashlib
class FileIngestionPipeline:
def process_upload(self, file_path: str):
"""Обработать загруженный файл"""
file_path = Path(file_path)
file_hash = self._compute_hash(file_path)
# Проверка на дупликаты
if self._is_duplicate(file_hash):
return {'status': 'duplicate', 'message': 'File already processed'}
# Определить формат
if file_path.suffix == '.csv':
df = pd.read_csv(file_path, low_memory=False)
elif file_path.suffix == '.json':
df = pd.read_json(file_path, lines=True) # JSONL
elif file_path.suffix == '.parquet':
df = pd.read_parquet(file_path)
else:
return {'status': 'error', 'message': f'Unsupported format: {file_path.suffix}'}
# Валидация
validation_errors = self._validate_schema(df)
if validation_errors:
return {'status': 'error', 'details': validation_errors}
# Data quality checks
quality_report = self._check_quality(df)
if quality_report['null_percentage'] > 30:
return {'status': 'warning', 'message': 'High null percentage', 'report': quality_report}
# Обогащение
df['ingestion_date'] = pd.Timestamp.utcnow()
df['source_file'] = file_path.name
df['file_hash'] = file_hash
# Load into warehouse
self._load_to_warehouse(df)
# Archiving
self._archive_file(file_path, file_hash)
return {'status': 'success', 'rows_loaded': len(df)}
def _validate_schema(self, df):
"""Validate schema against expected"""
expected_columns = {'user_id', 'email', 'name', 'signup_date'}
missing = expected_columns - set(df.columns)
extra = set(df.columns) - expected_columns
errors = []
if missing:
errors.append(f'Missing columns: {missing}')
if extra:
errors.append(f'Extra columns: {extra}')
# Type validation
if 'email' in df.columns and not df['email'].str.match(r'^[\w\.-]+@[\w\.-]+\.\w+$').all():
errors.append('Invalid email format')
return errors
def _check_quality(self, df):
"""Data quality report"""
return {
'total_rows': len(df),
'null_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
'duplicates': df.duplicated().sum(),
'column_nulls': df.isnull().sum().to_dict(),
}
Вызовы:
- No schema — CSV файл может быть составлен пользователем вручную
- Encoding issues — Windows-1251 vs UTF-8
- Duplicate uploads — пользователь загрузил файл дважды
- Data drift — с месяца на месяц структура меняется
4. Database Replication (CDC — Change Data Capture)
Сценарий 4: Operational database replication
Нужно было реплицировать данные из production PostgreSQL в Data Warehouse:
# Использовал Debezium + Kafka
# Debezium автоматически:
# - Читает WAL (Write-Ahead Log) из PostgreSQL
# - Отправляет changes в Kafka
# - Гарантирует exactly-once delivery
# Вот формат сообщения
cdc_message = {
'before': {
'id': 123,
'email': 'old@example.com',
'status': 'inactive'
},
'after': {
'id': 123,
'email': 'new@example.com',
'status': 'active'
},
'source': {
'version': '1.9.7.Final',
'connector': 'postgresql',
'name': 'production-db',
'ts_ms': 1711187730000,
'txId': 745,
'lsn': 23456789,
'db': 'production',
'schema': 'public',
'table': 'users',
},
'op': 'u', # u=update, c=create, d=delete, r=read
'ts_ms': 1711187735000
}
# Consumer для применения changes
from kafka import KafkaConsumer
import json
class CDCApplier:
def apply_changes(self, message):
"""Применить change в целевую БД"""
op = message['op']
table = message['source']['table']
if op == 'c':
# INSERT
self.insert(table, message['after'])
elif op == 'u':
# UPDATE — важно обновить по PK
pk = message['after'].get('id')
self.update(table, pk, message['after'])
elif op == 'd':
# DELETE
pk = message['before'].get('id')
self.delete(table, pk)
Вызовы:
- Temporal consistency — changes могут приходить не в порядке
- Cascading deletes — удаление родителя cascades
- Schema evolution — добавляются новые столбцы в production
5. Message Queues (RabbitMQ, AMQP)
Сценарий 5: Микросервисная архитектура
Разные микросервисы публиковали события в RabbitMQ:
import pika
import json
from datetime import datetime
class DataPipeline:
def consume_events(self):
"""Consume from RabbitMQ"""
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq-broker', heartbeat=600)
)
channel = connection.channel()
# Declare exchange (если не существует)
channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
# Declare queue
channel.queue_declare(queue='data-pipeline', durable=True)
# Bind queue to routing keys
channel.queue_bind(
exchange='events',
queue='data-pipeline',
routing_key='orders.*' # orders.created, orders.updated
)
def callback(ch, method, properties, body):
try:
event = json.loads(body)
# Обработка
enriched_event = self.enrich_event(event)
# Сохранение
self.save_to_dw(enriched_event)
# Acknowledge
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Negative acknowledge — вернуть в очередь
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
log_error(f'Error processing event: {e}')
# Consume
channel.basic_qos(prefetch_count=10) # Обрабатывай 10 событий параллельно
channel.basic_consume(
queue='data-pipeline',
on_message_callback=callback
)
channel.start_consuming()
6. Batch dumps (Periodic exports)
Сценарий 6: Внешние системы
Внешние парнёры отправляли нам дневные дампы:
# S3 структура:
# s3://data-bucket/external/
# └── partner_a/
# ├── 2024-03-20_users.csv.gz
# ├── 2024-03-20_transactions.csv.gz
# └── manifest.json
# Ежедневно:
# 1. Проверяем S3 на новые файлы
# 2. Распаковываем
# 3. Валидируем
# 4. Загружаем в Data Warehouse
import boto3
import gzip
import pandas as pd
class BatchLoader:
def load_daily_dump(self, date_str):
"""Load daily dump from S3"""
s3 = boto3.client('s3')
bucket = 'data-bucket'
prefix = f'external/partner_a/{date_str}_'
# List objects
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in response.get('Contents', []):
key = obj['Key']
# Download
local_file = f'/tmp/{key.split("/")[-1]}'
s3.download_file(bucket, key, local_file)
# Decompress
if key.endswith('.gz'):
with gzip.open(local_file, 'rt') as f:
df = pd.read_csv(f)
else:
df = pd.read_csv(local_file)
# Load
self.load_to_warehouse(df, key)
Summary: Общие паттерны
| Формат | Частота | Объём | Сложность |
|---|---|---|---|
| Streaming (Kafka) | Real-time | Гигабайты/день | Высокая |
| API polling | Hourly/Daily | Мегабайты | Средняя |
| File uploads | Manual/Daily | Переменно | Средняя |
| CDC | Real-time | Мегабайты/день | Высокая |
| Message queues | Real-time | Мегабайты/день | Средняя |
| Batch dumps | Daily/Weekly | Гигабайты | Низкая |
Мой опыт: самое сложное — обрабатывать комбинацию форматов в одной системе. Real-time streaming + периодические file uploads = chaos в reconciliation.