← Назад к вопросам

Как избежать нагрузки на источник данных?

2.0 Middle🔥 151 комментариев
#ETL и качество данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как избежать нагрузки на источник данных

Источники данных (базы данных, API, хранилища) часто являются узким местом в data pipeline. Перегруженный источник данных замедляет работу, может привести к сбоям и недоступности. Я расскажу о стратегиях уменьшения нагрузки.

1. Извлечение данных (Extract) — Оптимизация запросов

Проблема: Неоптимальные запросы нагружают БД ненужной работой.

Решение — Push-down предикатов:

-- ❌ Плохо: читаешь ВСЕ данные, потом фильтруешь в приложении
SELECT * FROM customers;  -- 10 млн строк

-- ✅ Хорошо: фильтруешь на уровне БД
SELECT id, name, email FROM customers WHERE created_at > 2024-01-01;

Oптимизация SELECT запросов:

-- ❌ Плохо: SELECT *
SELECT * FROM orders;

-- ✅ Хорошо: выбирай только нужные колонки
SELECT order_id, customer_id, total_amount FROM orders WHERE status = completed;

-- ❌ Плохо: JOIN БЕЗ индекса
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = US;

-- ✅ Хорошо: убедись что есть индекс на customer_id
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
CREATE INDEX idx_customers_country ON customers(country);

Снижение количества запросов:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("extract").getOrCreate()

# ❌ Плохо: N+1 запросы
customers = spark.read.jdbc(url, "SELECT * FROM customers", connectionProperties)
for customer_id in customer_ids:
    orders = spark.read.jdbc(url, f"SELECT * FROM orders WHERE customer_id = {customer_id}", ...)

# ✅ Хорошо: один запрос с условием
customer_ids_list = [1, 2, 3, 4, 5]
query = f"SELECT * FROM orders WHERE customer_id IN ({..join(map(str, customer_ids_list))})"
orders = spark.read.jdbc(url, f"({query}) as subquery", connectionProperties)

2. Кэширование данных

Кэширование на уровне приложения:

from functools import lru_cache
from datetime import datetime, timedelta

class DataCache:
    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds
    
    def get(self, key, fetch_fn):
        """
        Если данные есть в кэше и не устарели — вернуть кэш.
        Иначе — вызвать функцию для получения свежих данных.
        """
        if key in self.cache:
            data, timestamp = self.cache[key]
            if (datetime.now() - timestamp).total_seconds() < self.ttl:
                return data
        
        # Получить свежие данные
        data = fetch_fn()
        self.cache[key] = (data, datetime.now())
        return data

# Использование
cache = DataCache(ttl_seconds=3600)  # 1 час

def fetch_customers():
    return spark.read.jdbc(url, "SELECT * FROM customers", ...)

customers_df = cache.get("customers", fetch_customers)

Кэширование в Spark:

# Кэширование DataFrame в памяти
df = spark.read.parquet("data.parquet")
df.cache()  # Кэшируется при первом action
df.count()  # Action: кэшируется

# Использование кэша несколько раз (без переоценки)
df.show()        # Быстро — из кэша
df.groupBy("city").count().show()  # Быстро — из кэша

df.unpersist()  # Очистить кэш

3. Инкрементальное извлечение данных (Incremental Load)

Проблема: Каждый раз читаешь ВСЕ данные, хотя новые только за последний час/день.

Решение — Incremental Load с checkpoint:

from pyspark.sql.functions import col, max as spark_max

spark = SparkSession.builder.appName("incremental").getOrCreate()

# Прочитать checkpoint (последний обработанный timestamp)
try:
    with open("checkpoint.txt", "r") as f:
        last_timestamp = f.read().strip()
except FileNotFoundError:
    last_timestamp = "1970-01-01"

# Извлечь только новые данные
df = spark.read.jdbc(
    url=connection_url,
    table="(SELECT * FROM orders WHERE updated_at > {}) t".format(last_timestamp),
    properties=connection_properties
)

# Обработать и сохранить
df.write.format("delta").mode("append").save("s3://bucket/orders")

# Обновить checkpoint
new_timestamp = df.agg(spark_max(col("updated_at"))).collect()[0][0]
with open("checkpoint.txt", "w") as f:
    f.write(str(new_timestamp))

print(f"Loaded {df.count()} new records. Last timestamp: {new_timestamp}")

4. Connection pooling и батчинг

Connection pooling — переиспользование соединений:

from sqlalchemy import create_engine, pool

# ❌ Плохо: новое соединение для каждого запроса
for i in range(1000):
    connection = psycopg2.connect("dbname=mydb user=postgres")
    cursor = connection.cursor()
    cursor.execute("SELECT count(*) FROM users")
    connection.close()  # Дорого!

# ✅ Хорошо: pool переиспользует соединения
engine = create_engine(
    "postgresql://user:password@localhost/mydb",
    poolclass=pool.QueuePool,
    pool_size=10,       # Количество соединений в pool
    max_overflow=20,    # Доп. соединения если нужно
    pool_recycle=3600   # Переиспользовать соединение макс. 1 час
)

with engine.connect() as connection:
    result = connection.execute("SELECT count(*) FROM users")

Батчинг — отправка данных партиями:

def batch_insert(records, batch_size=1000):
    """Вставить записи батчами вместо один-за-одним."""
    for i in range(0, len(records), batch_size):
        batch = records[i:i+batch_size]
        
        # Одна вставка за батч (очень эффективнее)
        insert_stmt = "INSERT INTO users (name, email) VALUES %s"
        execute_values(cursor, insert_stmt, batch)
        
        print(f"Inserted batch {i//batch_size + 1}")

records = [(f"User{i}", f"user{i}@example.com") for i in range(10000)]
batch_insert(records)

5. Отделение OLTP от OLAP

Проблема: Тяжёлые аналитические запросы замораживают операционные системы (OLTP).

Решение — CDC (Change Data Capture) + отдельное хранилище:

Production DB (OLTP)
      ↓ (CDC: Capture changes)
      ↓
Data Lake (Parquet, Delta)
      ↓ (Heavy analytics)
      ↓
BI Dashboard (читает из Data Lake, не OLTP)

Пример с Kafka для CDC:

# Producer: читает изменения из БД
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=[localhost:9092])

# Эмулируем CDC: отправляем изменения в Kafka
def publish_changes(table_name, records):
    for record in records:
        producer.send(cdc-topic, json.dumps({
            table: table_name,
            operation: INSERT,
            data: record
        }).encode())

# Consumer: читает из Kafka, пишет в Data Lake
def process_changes_from_kafka():
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, schema_of_json
    
    spark = SparkSession.builder.appName("cdc").getOrCreate()
    
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "cdc-topic") \
        .load()
    
    # Вычисляем и пишем в Data Lake
    df.writeStream \
        .format("delta") \
        .mode("append") \
        .option("path", "s3://bucket/changes") \
        .option("checkpointLocation", "s3://bucket/checkpoint") \
        .start()

6. Read replicas и load balancing

Использование read-только реплик для аналитики:

# ❌ Плохо: все запросы на основную БД
df = spark.read.jdbc(
    url="jdbc:postgresql://primary.db.com:5432/mydb",
    table="large_table",
    properties=connection_properties
)

# ✅ Хорошо: читаешь с read-only реплики
df = spark.read.jdbc(
    url="jdbc:postgresql://replica-1.db.com:5432/mydb",
    table="large_table",
    properties=connection_properties
)

Load balancing между несколькими репликами:

import random

REPLICAS = [
    "replica-1.db.com",
    "replica-2.db.com", 
    "replica-3.db.com"
]

def read_from_replicas(table_name, num_partitions=10):
    """Разные партиции читаются с разных реплик."""
    replica = random.choice(REPLICAS)
    
    df = spark.read.jdbc(
        url=f"jdbc:postgresql://{replica}:5432/mydb",
        table=table_name,
        numPartitions=num_partitions,
        properties=connection_properties
    )
    
    return df

7. Асинхронное извлечение (Async Extract)

Асинхронные запросы — не блокируют основной поток:

import asyncio
import aiohttp

async def fetch_data_async(url):
    """Асинхронный запрос к API."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def fetch_multiple_endpoints():
    """Параллельно фетчить несколько endpoints."""
    tasks = [
        fetch_data_async("https://api.example.com/users"),
        fetch_data_async("https://api.example.com/orders"),
        fetch_data_async("https://api.example.com/products")
    ]
    
    results = await asyncio.gather(*tasks)
    return results

# Запустить
results = asyncio.run(fetch_multiple_endpoints())

Чеклист оптимизации нагрузки

  • Оптимизированы запросы (Push-down predicates, SELECT нужные колонки)
  • Созданы индексы на часто используемые колонки
  • Используется кэширование (на уровне приложения и Spark)
  • Реализовано инкрементальное извлечение (не полный reload)
  • Connection pooling настроен (переиспользование соединений)
  • Батчинг используется при вставке данных
  • Аналитика отделена от production БД (read replicas)
  • Используется асинхронность где применима

Вывод

Уменьшение нагрузки на источник данных — это комбинация техник: оптимизации запросов, кэширования, инкрементальной загрузки и архитектурных решений (CDC, read replicas). Правильный подход экономит ресурсы, повышает надёжность и скорость всей системы.