← Назад к вопросам

Какие проблемы могут возникнуть при доставке данных?

1.7 Middle🔥 181 комментариев
#ETL и качество данных#Архитектура и проектирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Какие проблемы могут возникнуть при доставке данных

При доставке данных в systems встречаются многие проблемы.

1. Duplicates (Дубликаты)

Проблема: Kafka переотправляет сообщения при перезагрузке.

# Решение 1: Deduplicate в памяти
seen_ids = set()

for message in kafka_stream:
    if message['id'] in seen_ids:
        continue
    seen_ids.add(message['id'])
    process(message)

# Решение 2: Database level (UNIQUE constraint)
CREATE TABLE processed_events (
    event_id VARCHAR PRIMARY KEY,
    event_data JSONB,
    processed_at TIMESTAMP
);

# Решение 3: Idempotency key
def process_payment(transaction_id, amount):
    # Проверяем, уже ли обработали
    if db.exists('transaction', transaction_id):
        return db.get('transaction', transaction_id)
    
    # Обрабатываем
    result = charge_card(amount)
    db.set('transaction', transaction_id, result)
    return result

2. Out of Order Messages (Сообщения не в порядке)

Проблема: сообщения приходят в неправильном порядке.

# Решение: Window aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col

df = spark.readStream.format("kafka").load()

# Даём 1 минуту на позднее прибытие
windowed = df.withWatermark("timestamp", "1 minute") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(sum("amount"))

3. Late Arrivals (Поздние сообщения)

Проблема: данные приходят с задержкой.

# Watermark определяет, как долго ждать
stream.withWatermark("event_time", "10 minutes") \
    .groupBy("user_id") \
    .agg(count("*"))

# Всё после watermark отбрасывается

4. Latency (Задержка)

Проблема: пиковая нагрузка → очередь растёт.

# Решение 1: Autoscaling
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, batchDuration=2)
ssc.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

# Решение 2: Rate limiting
stream.option("maxOffsetsPerTrigger", 100000)

# Решение 3: Partitioning
stream.repartition(32)  # Распределяем на 32 executor

5. Data Corruption (Повреждение данных)

Проблема: сетевые ошибки повреждают данные.

# Решение: Checksum
import hashlib

def send_with_checksum(data):
    checksum = hashlib.md5(json.dumps(data).encode()).hexdigest()
    return {
        'data': data,
        'checksum': checksum
    }

def verify_checksum(message):
    expected = message['checksum']
    actual = hashlib.md5(json.dumps(message['data']).encode()).hexdigest()
    assert expected == actual, "Data corrupted"

6. Network Failures (Сетевые ошибки)

Проблема: connection timeout, packet loss.

# Решение: Retry with exponential backoff
import time

def send_with_retry(data, max_retries=3):
    for attempt in range(max_retries):
        try:
            return kafka_producer.send(data)
        except Exception as e:
            wait_time = 2 ** attempt  # 1s, 2s, 4s
            logger.warning(f"Attempt {attempt} failed, retrying in {wait_time}s")
            time.sleep(wait_time)
    raise Exception("Failed after retries")

7. Schema Mismatch (Несоответствие схемы)

Проблема: новая версия отправляет данные другой структуры.

# Решение: Schema Registry
from confluent_kafka import KafkaProducer
from confluent_kafka.schema_registry import SchemaRegistryClient

schema_registry = SchemaRegistryClient({'url': 'http://localhost:8081'})

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    schema_registry_client=schema_registry,
    value_schema="{'type': 'record', 'fields': [...]}"
)

# Автоматическая валидация схемы

8. Back Pressure (Обратное давление)

Проблема: producer быстрее, чем consumer может обработать.

# Решение: Buffer limit
producer = KafkaProducer(
    buffer_memory=67108864,  # 64MB буфер
    batch_size=16384,        # Batch размер
    linger_ms=10             # Ждём 10ms перед отправкой
)

# Consumer lag мониторинг
consumer_lag = (latest_offset - consumer_offset)
if consumer_lag > 100000:
    alert("High consumer lag detected")

9. Exactly-Once vs At-Least-Once

Проблема: какую гарантию выбрать?

# At-least-once: быстро, возможны дубликаты
producer.send(topic, data)

# Exactly-once: медленно, нет дубликатов
with transaction:
    producer.send_transactional(topic, data)
    producer.commit_transaction()

10. Partition Rebalancing (Переухудшение разделов)

Проблема: consumer group переубирается → падает throughput.

# Решение: Graceful shutdown
from signal import signal, SIGTERM

def handle_sigterm(signum, frame):
    consumer.close()
    exit(0)

signal(SIGTERM, handle_sigterm)

# Перебалансировка происходит медленнее, но ожидаемо

11. Monitoring и Alerting

from prometheus_client import Counter, Histogram

# Метрики для проблем
delivery_lag = Histogram('delivery_lag_seconds', 'Delivery lag')
duplicate_count = Counter('duplicates_total', 'Duplicate messages')
error_rate = Counter('delivery_errors', 'Delivery errors')

with delivery_lag.time():
    deliver_message(msg)

12. Реальный пример: Решение всех проблем

class RobustDataPipeline:
    def __init__(self):
        self.seen_events = set()
        self.logger = setup_logging()
    
    def process_stream(self, kafka_stream):
        for message in kafka_stream:
            try:
                # Checksum валидация
                if not self.verify_checksum(message):
                    self.logger.error("Checksum failed")
                    continue
                
                # Dupliate check
                if message['id'] in self.seen_events:
                    continue
                self.seen_events.add(message['id'])
                
                # Watermark для late arrivals
                if self.is_too_late(message['timestamp']):
                    continue
                
                # Process
                self.save_to_db(message)
                
            except Exception as e:
                self.logger.error(f"Processing failed: {e}")
                self.retry_with_backoff(message)

Итог: комбинация retry, deduplicate, watermark, monitoring решает 99% проблем.