← Назад к вопросам

В каком виде данные приходили в вашу систему

1.0 Junior🔥 141 комментариев
#Опыт и soft skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Форматы и способы поступления данных в системы (мой опыт)

Больше всего я работал с разнообразными источниками данных. За 10+ лет встречал практически всё, от аккуратных API до полного хаоса в CSV файлах. Вот реальные примеры.

1. Real-time Streaming (Kafka, RabbitMQ, Kinesis)

Сценарий 1: E-commerce платформа

Данные о событиях пользователя приходили в реальном времени через Kafka:

{
  "event_type": "page_view",
  "user_id": "usr_123456",
  "session_id": "sess_abc789",
  "timestamp": "2024-03-23T10:15:30.123Z",
  "page_url": "/products/laptop",
  "device": "mobile",
  "ip_address": "192.168.1.1",
  "referrer": "google",
  "properties": {
    "product_id": "prod_999",
    "category": "electronics"
  }
}

Как обрабатывал:

from kafka import KafkaConsumer
import json
from datetime import datetime

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    
    # Валидация
    if not event.get('user_id') or not event.get('timestamp'):
        log_error(f'Invalid event: {event}')
        continue
    
    # Обогащение
    event['processed_at'] = datetime.utcnow().isoformat()
    event['country'] = lookup_country_by_ip(event['ip_address'])
    
    # Сохранение
    save_to_clickhouse(event)
    save_to_data_lake_s3(event)

Вызовы:

  • Duplicate events — нужны идемпотентные операции
  • Out-of-order events — temporal anomalies
  • Schema evolution — какой-то events приходили со старой схемой
  • Late arrivals — события приходили с задержкой часа

2. REST API (Polling)

Сценарий 2: CRM система (Salesforce)

Данные о сделках нужно было синхронизировать каждый час:

import requests
from datetime import datetime, timedelta
import time

class SalesforceExtractor:
    def __init__(self, api_token):
        self.base_url = 'https://instance.salesforce.com'
        self.token = api_token
        self.headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
    def extract_opportunities(self):
        """Получить все сделки, обновлённые за последний час"""
        
        # Incremental extraction — важно не грузить всё с начала
        last_run = get_last_run_time()  # Из БД
        since = (datetime.utcnow() - timedelta(hours=1)).isoformat()
        
        all_records = []
        offset = 0
        limit = 2000  # Salesforce limit
        
        while True:
            # SOQL query
            query = f"""
            SELECT Id, Name, Amount, StageName, CloseDate, AccountId, CreatedDate
            FROM Opportunity
            WHERE LastModifiedDate >= {since}Z
            ORDER BY Id
            OFFSET {offset}
            LIMIT {limit}
            """
            
            # URL encoding
            url = f"{self.base_url}/services/data/v57.0/query"
            params = {'q': query}
            
            try:
                response = requests.get(
                    url,
                    headers=self.headers,
                    params=params,
                    timeout=30
                )
                response.raise_for_status()
                
                data = response.json()
                all_records.extend(data['records'])
                
                # Check pagination
                if not data.get('nextRecordsUrl'):
                    break
                    
                offset += limit
                
                # Rate limiting (Salesforce: 15 RPS для batch)
                time.sleep(0.1)
                
            except requests.exceptions.RequestException as e:
                log_error(f'Salesforce API error: {e}')
                # Retry logic
                time.sleep(5)
                continue
        
        return all_records
    
    def transform_and_load(self, records):
        """Transform и load into Data Warehouse"""
        
        for record in records:
            # Remove Salesforce metadata
            opportunity = {
                'sf_id': record['Id'],
                'name': record['Name'],
                'amount': float(record.get('Amount') or 0),
                'stage': record['StageName'],
                'close_date': record.get('CloseDate'),
                'account_id': record.get('AccountId'),
                'created_date': record['CreatedDate'],
                'extracted_at': datetime.utcnow(),
            }
            
            # Upsert в PostgreSQL
            upsert_opportunity(opportunity)

Вызовы:

  • Rate limits — Salesforce позволяет 15 запросов в секунду
  • Pagination — нужно обработать все батчи
  • Schema changes — когда добавляют новые поля в Salesforce
  • Deleted records — нужно синхронизировать deletions

3. File uploads (CSV, JSON, Parquet)

Сценарий 3: Маркетинговая компания

Клиенты загружали файлы вручную (ежемесячно):

import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path
import hashlib

class FileIngestionPipeline:
    def process_upload(self, file_path: str):
        """Обработать загруженный файл"""
        
        file_path = Path(file_path)
        file_hash = self._compute_hash(file_path)
        
        # Проверка на дупликаты
        if self._is_duplicate(file_hash):
            return {'status': 'duplicate', 'message': 'File already processed'}
        
        # Определить формат
        if file_path.suffix == '.csv':
            df = pd.read_csv(file_path, low_memory=False)
        elif file_path.suffix == '.json':
            df = pd.read_json(file_path, lines=True)  # JSONL
        elif file_path.suffix == '.parquet':
            df = pd.read_parquet(file_path)
        else:
            return {'status': 'error', 'message': f'Unsupported format: {file_path.suffix}'}
        
        # Валидация
        validation_errors = self._validate_schema(df)
        if validation_errors:
            return {'status': 'error', 'details': validation_errors}
        
        # Data quality checks
        quality_report = self._check_quality(df)
        if quality_report['null_percentage'] > 30:
            return {'status': 'warning', 'message': 'High null percentage', 'report': quality_report}
        
        # Обогащение
        df['ingestion_date'] = pd.Timestamp.utcnow()
        df['source_file'] = file_path.name
        df['file_hash'] = file_hash
        
        # Load into warehouse
        self._load_to_warehouse(df)
        
        # Archiving
        self._archive_file(file_path, file_hash)
        
        return {'status': 'success', 'rows_loaded': len(df)}
    
    def _validate_schema(self, df):
        """Validate schema against expected"""
        expected_columns = {'user_id', 'email', 'name', 'signup_date'}
        missing = expected_columns - set(df.columns)
        extra = set(df.columns) - expected_columns
        
        errors = []
        if missing:
            errors.append(f'Missing columns: {missing}')
        if extra:
            errors.append(f'Extra columns: {extra}')
        
        # Type validation
        if 'email' in df.columns and not df['email'].str.match(r'^[\w\.-]+@[\w\.-]+\.\w+$').all():
            errors.append('Invalid email format')
        
        return errors
    
    def _check_quality(self, df):
        """Data quality report"""
        return {
            'total_rows': len(df),
            'null_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
            'duplicates': df.duplicated().sum(),
            'column_nulls': df.isnull().sum().to_dict(),
        }

Вызовы:

  • No schema — CSV файл может быть составлен пользователем вручную
  • Encoding issues — Windows-1251 vs UTF-8
  • Duplicate uploads — пользователь загрузил файл дважды
  • Data drift — с месяца на месяц структура меняется

4. Database Replication (CDC — Change Data Capture)

Сценарий 4: Operational database replication

Нужно было реплицировать данные из production PostgreSQL в Data Warehouse:

# Использовал Debezium + Kafka

# Debezium автоматически:
# - Читает WAL (Write-Ahead Log) из PostgreSQL
# - Отправляет changes в Kafka
# - Гарантирует exactly-once delivery

# Вот формат сообщения
cdc_message = {
    'before': {
        'id': 123,
        'email': 'old@example.com',
        'status': 'inactive'
    },
    'after': {
        'id': 123,
        'email': 'new@example.com',
        'status': 'active'
    },
    'source': {
        'version': '1.9.7.Final',
        'connector': 'postgresql',
        'name': 'production-db',
        'ts_ms': 1711187730000,
        'txId': 745,
        'lsn': 23456789,
        'db': 'production',
        'schema': 'public',
        'table': 'users',
    },
    'op': 'u',  # u=update, c=create, d=delete, r=read
    'ts_ms': 1711187735000
}

# Consumer для применения changes
from kafka import KafkaConsumer
import json

class CDCApplier:
    def apply_changes(self, message):
        """Применить change в целевую БД"""
        
        op = message['op']
        table = message['source']['table']
        
        if op == 'c':
            # INSERT
            self.insert(table, message['after'])
        elif op == 'u':
            # UPDATE — важно обновить по PK
            pk = message['after'].get('id')
            self.update(table, pk, message['after'])
        elif op == 'd':
            # DELETE
            pk = message['before'].get('id')
            self.delete(table, pk)

Вызовы:

  • Temporal consistency — changes могут приходить не в порядке
  • Cascading deletes — удаление родителя cascades
  • Schema evolution — добавляются новые столбцы в production

5. Message Queues (RabbitMQ, AMQP)

Сценарий 5: Микросервисная архитектура

Разные микросервисы публиковали события в RabbitMQ:

import pika
import json
from datetime import datetime

class DataPipeline:
    def consume_events(self):
        """Consume from RabbitMQ"""
        
        connection = pika.BlockingConnection(
            pika.ConnectionParameters('rabbitmq-broker', heartbeat=600)
        )
        channel = connection.channel()
        
        # Declare exchange (если не существует)
        channel.exchange_declare(
            exchange='events',
            exchange_type='topic',
            durable=True
        )
        
        # Declare queue
        channel.queue_declare(queue='data-pipeline', durable=True)
        
        # Bind queue to routing keys
        channel.queue_bind(
            exchange='events',
            queue='data-pipeline',
            routing_key='orders.*'  # orders.created, orders.updated
        )
        
        def callback(ch, method, properties, body):
            try:
                event = json.loads(body)
                
                # Обработка
                enriched_event = self.enrich_event(event)
                
                # Сохранение
                self.save_to_dw(enriched_event)
                
                # Acknowledge
                ch.basic_ack(delivery_tag=method.delivery_tag)
                
            except Exception as e:
                # Negative acknowledge — вернуть в очередь
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
                log_error(f'Error processing event: {e}')
        
        # Consume
        channel.basic_qos(prefetch_count=10)  # Обрабатывай 10 событий параллельно
        channel.basic_consume(
            queue='data-pipeline',
            on_message_callback=callback
        )
        
        channel.start_consuming()

6. Batch dumps (Periodic exports)

Сценарий 6: Внешние системы

Внешние парнёры отправляли нам дневные дампы:

# S3 структура:
# s3://data-bucket/external/
#   └── partner_a/
#       ├── 2024-03-20_users.csv.gz
#       ├── 2024-03-20_transactions.csv.gz
#       └── manifest.json

# Ежедневно:
# 1. Проверяем S3 на новые файлы
# 2. Распаковываем
# 3. Валидируем
# 4. Загружаем в Data Warehouse
import boto3
import gzip
import pandas as pd

class BatchLoader:
    def load_daily_dump(self, date_str):
        """Load daily dump from S3"""
        
        s3 = boto3.client('s3')
        bucket = 'data-bucket'
        prefix = f'external/partner_a/{date_str}_'
        
        # List objects
        response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
        
        for obj in response.get('Contents', []):
            key = obj['Key']
            
            # Download
            local_file = f'/tmp/{key.split("/")[-1]}'
            s3.download_file(bucket, key, local_file)
            
            # Decompress
            if key.endswith('.gz'):
                with gzip.open(local_file, 'rt') as f:
                    df = pd.read_csv(f)
            else:
                df = pd.read_csv(local_file)
            
            # Load
            self.load_to_warehouse(df, key)

Summary: Общие паттерны

ФорматЧастотаОбъёмСложность
Streaming (Kafka)Real-timeГигабайты/деньВысокая
API pollingHourly/DailyМегабайтыСредняя
File uploadsManual/DailyПеременноСредняя
CDCReal-timeМегабайты/деньВысокая
Message queuesReal-timeМегабайты/деньСредняя
Batch dumpsDaily/WeeklyГигабайтыНизкая

Мой опыт: самое сложное — обрабатывать комбинацию форматов в одной системе. Real-time streaming + периодические file uploads = chaos в reconciliation.

В каком виде данные приходили в вашу систему | PrepBro