← Назад к вопросам
Какие проблемы могут возникнуть при доставке данных?
1.7 Middle🔥 181 комментариев
#ETL и качество данных#Архитектура и проектирование
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Какие проблемы могут возникнуть при доставке данных
При доставке данных в systems встречаются многие проблемы.
1. Duplicates (Дубликаты)
Проблема: Kafka переотправляет сообщения при перезагрузке.
# Решение 1: Deduplicate в памяти
seen_ids = set()
for message in kafka_stream:
if message['id'] in seen_ids:
continue
seen_ids.add(message['id'])
process(message)
# Решение 2: Database level (UNIQUE constraint)
CREATE TABLE processed_events (
event_id VARCHAR PRIMARY KEY,
event_data JSONB,
processed_at TIMESTAMP
);
# Решение 3: Idempotency key
def process_payment(transaction_id, amount):
# Проверяем, уже ли обработали
if db.exists('transaction', transaction_id):
return db.get('transaction', transaction_id)
# Обрабатываем
result = charge_card(amount)
db.set('transaction', transaction_id, result)
return result
2. Out of Order Messages (Сообщения не в порядке)
Проблема: сообщения приходят в неправильном порядке.
# Решение: Window aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
df = spark.readStream.format("kafka").load()
# Даём 1 минуту на позднее прибытие
windowed = df.withWatermark("timestamp", "1 minute") \
.groupBy(window("timestamp", "5 minutes")) \
.agg(sum("amount"))
3. Late Arrivals (Поздние сообщения)
Проблема: данные приходят с задержкой.
# Watermark определяет, как долго ждать
stream.withWatermark("event_time", "10 minutes") \
.groupBy("user_id") \
.agg(count("*"))
# Всё после watermark отбрасывается
4. Latency (Задержка)
Проблема: пиковая нагрузка → очередь растёт.
# Решение 1: Autoscaling
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, batchDuration=2)
ssc.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
# Решение 2: Rate limiting
stream.option("maxOffsetsPerTrigger", 100000)
# Решение 3: Partitioning
stream.repartition(32) # Распределяем на 32 executor
5. Data Corruption (Повреждение данных)
Проблема: сетевые ошибки повреждают данные.
# Решение: Checksum
import hashlib
def send_with_checksum(data):
checksum = hashlib.md5(json.dumps(data).encode()).hexdigest()
return {
'data': data,
'checksum': checksum
}
def verify_checksum(message):
expected = message['checksum']
actual = hashlib.md5(json.dumps(message['data']).encode()).hexdigest()
assert expected == actual, "Data corrupted"
6. Network Failures (Сетевые ошибки)
Проблема: connection timeout, packet loss.
# Решение: Retry with exponential backoff
import time
def send_with_retry(data, max_retries=3):
for attempt in range(max_retries):
try:
return kafka_producer.send(data)
except Exception as e:
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.warning(f"Attempt {attempt} failed, retrying in {wait_time}s")
time.sleep(wait_time)
raise Exception("Failed after retries")
7. Schema Mismatch (Несоответствие схемы)
Проблема: новая версия отправляет данные другой структуры.
# Решение: Schema Registry
from confluent_kafka import KafkaProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
schema_registry = SchemaRegistryClient({'url': 'http://localhost:8081'})
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
schema_registry_client=schema_registry,
value_schema="{'type': 'record', 'fields': [...]}"
)
# Автоматическая валидация схемы
8. Back Pressure (Обратное давление)
Проблема: producer быстрее, чем consumer может обработать.
# Решение: Buffer limit
producer = KafkaProducer(
buffer_memory=67108864, # 64MB буфер
batch_size=16384, # Batch размер
linger_ms=10 # Ждём 10ms перед отправкой
)
# Consumer lag мониторинг
consumer_lag = (latest_offset - consumer_offset)
if consumer_lag > 100000:
alert("High consumer lag detected")
9. Exactly-Once vs At-Least-Once
Проблема: какую гарантию выбрать?
# At-least-once: быстро, возможны дубликаты
producer.send(topic, data)
# Exactly-once: медленно, нет дубликатов
with transaction:
producer.send_transactional(topic, data)
producer.commit_transaction()
10. Partition Rebalancing (Переухудшение разделов)
Проблема: consumer group переубирается → падает throughput.
# Решение: Graceful shutdown
from signal import signal, SIGTERM
def handle_sigterm(signum, frame):
consumer.close()
exit(0)
signal(SIGTERM, handle_sigterm)
# Перебалансировка происходит медленнее, но ожидаемо
11. Monitoring и Alerting
from prometheus_client import Counter, Histogram
# Метрики для проблем
delivery_lag = Histogram('delivery_lag_seconds', 'Delivery lag')
duplicate_count = Counter('duplicates_total', 'Duplicate messages')
error_rate = Counter('delivery_errors', 'Delivery errors')
with delivery_lag.time():
deliver_message(msg)
12. Реальный пример: Решение всех проблем
class RobustDataPipeline:
def __init__(self):
self.seen_events = set()
self.logger = setup_logging()
def process_stream(self, kafka_stream):
for message in kafka_stream:
try:
# Checksum валидация
if not self.verify_checksum(message):
self.logger.error("Checksum failed")
continue
# Dupliate check
if message['id'] in self.seen_events:
continue
self.seen_events.add(message['id'])
# Watermark для late arrivals
if self.is_too_late(message['timestamp']):
continue
# Process
self.save_to_db(message)
except Exception as e:
self.logger.error(f"Processing failed: {e}")
self.retry_with_backoff(message)
Итог: комбинация retry, deduplicate, watermark, monitoring решает 99% проблем.