Как избежать нагрузки на источник данных?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как избежать нагрузки на источник данных
Источники данных (базы данных, API, хранилища) часто являются узким местом в data pipeline. Перегруженный источник данных замедляет работу, может привести к сбоям и недоступности. Я расскажу о стратегиях уменьшения нагрузки.
1. Извлечение данных (Extract) — Оптимизация запросов
Проблема: Неоптимальные запросы нагружают БД ненужной работой.
Решение — Push-down предикатов:
-- ❌ Плохо: читаешь ВСЕ данные, потом фильтруешь в приложении
SELECT * FROM customers; -- 10 млн строк
-- ✅ Хорошо: фильтруешь на уровне БД
SELECT id, name, email FROM customers WHERE created_at > 2024-01-01;
Oптимизация SELECT запросов:
-- ❌ Плохо: SELECT *
SELECT * FROM orders;
-- ✅ Хорошо: выбирай только нужные колонки
SELECT order_id, customer_id, total_amount FROM orders WHERE status = completed;
-- ❌ Плохо: JOIN БЕЗ индекса
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = US;
-- ✅ Хорошо: убедись что есть индекс на customer_id
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
CREATE INDEX idx_customers_country ON customers(country);
Снижение количества запросов:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("extract").getOrCreate()
# ❌ Плохо: N+1 запросы
customers = spark.read.jdbc(url, "SELECT * FROM customers", connectionProperties)
for customer_id in customer_ids:
orders = spark.read.jdbc(url, f"SELECT * FROM orders WHERE customer_id = {customer_id}", ...)
# ✅ Хорошо: один запрос с условием
customer_ids_list = [1, 2, 3, 4, 5]
query = f"SELECT * FROM orders WHERE customer_id IN ({..join(map(str, customer_ids_list))})"
orders = spark.read.jdbc(url, f"({query}) as subquery", connectionProperties)
2. Кэширование данных
Кэширование на уровне приложения:
from functools import lru_cache
from datetime import datetime, timedelta
class DataCache:
def __init__(self, ttl_seconds=3600):
self.cache = {}
self.ttl = ttl_seconds
def get(self, key, fetch_fn):
"""
Если данные есть в кэше и не устарели — вернуть кэш.
Иначе — вызвать функцию для получения свежих данных.
"""
if key in self.cache:
data, timestamp = self.cache[key]
if (datetime.now() - timestamp).total_seconds() < self.ttl:
return data
# Получить свежие данные
data = fetch_fn()
self.cache[key] = (data, datetime.now())
return data
# Использование
cache = DataCache(ttl_seconds=3600) # 1 час
def fetch_customers():
return spark.read.jdbc(url, "SELECT * FROM customers", ...)
customers_df = cache.get("customers", fetch_customers)
Кэширование в Spark:
# Кэширование DataFrame в памяти
df = spark.read.parquet("data.parquet")
df.cache() # Кэшируется при первом action
df.count() # Action: кэшируется
# Использование кэша несколько раз (без переоценки)
df.show() # Быстро — из кэша
df.groupBy("city").count().show() # Быстро — из кэша
df.unpersist() # Очистить кэш
3. Инкрементальное извлечение данных (Incremental Load)
Проблема: Каждый раз читаешь ВСЕ данные, хотя новые только за последний час/день.
Решение — Incremental Load с checkpoint:
from pyspark.sql.functions import col, max as spark_max
spark = SparkSession.builder.appName("incremental").getOrCreate()
# Прочитать checkpoint (последний обработанный timestamp)
try:
with open("checkpoint.txt", "r") as f:
last_timestamp = f.read().strip()
except FileNotFoundError:
last_timestamp = "1970-01-01"
# Извлечь только новые данные
df = spark.read.jdbc(
url=connection_url,
table="(SELECT * FROM orders WHERE updated_at > {}) t".format(last_timestamp),
properties=connection_properties
)
# Обработать и сохранить
df.write.format("delta").mode("append").save("s3://bucket/orders")
# Обновить checkpoint
new_timestamp = df.agg(spark_max(col("updated_at"))).collect()[0][0]
with open("checkpoint.txt", "w") as f:
f.write(str(new_timestamp))
print(f"Loaded {df.count()} new records. Last timestamp: {new_timestamp}")
4. Connection pooling и батчинг
Connection pooling — переиспользование соединений:
from sqlalchemy import create_engine, pool
# ❌ Плохо: новое соединение для каждого запроса
for i in range(1000):
connection = psycopg2.connect("dbname=mydb user=postgres")
cursor = connection.cursor()
cursor.execute("SELECT count(*) FROM users")
connection.close() # Дорого!
# ✅ Хорошо: pool переиспользует соединения
engine = create_engine(
"postgresql://user:password@localhost/mydb",
poolclass=pool.QueuePool,
pool_size=10, # Количество соединений в pool
max_overflow=20, # Доп. соединения если нужно
pool_recycle=3600 # Переиспользовать соединение макс. 1 час
)
with engine.connect() as connection:
result = connection.execute("SELECT count(*) FROM users")
Батчинг — отправка данных партиями:
def batch_insert(records, batch_size=1000):
"""Вставить записи батчами вместо один-за-одним."""
for i in range(0, len(records), batch_size):
batch = records[i:i+batch_size]
# Одна вставка за батч (очень эффективнее)
insert_stmt = "INSERT INTO users (name, email) VALUES %s"
execute_values(cursor, insert_stmt, batch)
print(f"Inserted batch {i//batch_size + 1}")
records = [(f"User{i}", f"user{i}@example.com") for i in range(10000)]
batch_insert(records)
5. Отделение OLTP от OLAP
Проблема: Тяжёлые аналитические запросы замораживают операционные системы (OLTP).
Решение — CDC (Change Data Capture) + отдельное хранилище:
Production DB (OLTP)
↓ (CDC: Capture changes)
↓
Data Lake (Parquet, Delta)
↓ (Heavy analytics)
↓
BI Dashboard (читает из Data Lake, не OLTP)
Пример с Kafka для CDC:
# Producer: читает изменения из БД
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=[localhost:9092])
# Эмулируем CDC: отправляем изменения в Kafka
def publish_changes(table_name, records):
for record in records:
producer.send(cdc-topic, json.dumps({
table: table_name,
operation: INSERT,
data: record
}).encode())
# Consumer: читает из Kafka, пишет в Data Lake
def process_changes_from_kafka():
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, schema_of_json
spark = SparkSession.builder.appName("cdc").getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "cdc-topic") \
.load()
# Вычисляем и пишем в Data Lake
df.writeStream \
.format("delta") \
.mode("append") \
.option("path", "s3://bucket/changes") \
.option("checkpointLocation", "s3://bucket/checkpoint") \
.start()
6. Read replicas и load balancing
Использование read-только реплик для аналитики:
# ❌ Плохо: все запросы на основную БД
df = spark.read.jdbc(
url="jdbc:postgresql://primary.db.com:5432/mydb",
table="large_table",
properties=connection_properties
)
# ✅ Хорошо: читаешь с read-only реплики
df = spark.read.jdbc(
url="jdbc:postgresql://replica-1.db.com:5432/mydb",
table="large_table",
properties=connection_properties
)
Load balancing между несколькими репликами:
import random
REPLICAS = [
"replica-1.db.com",
"replica-2.db.com",
"replica-3.db.com"
]
def read_from_replicas(table_name, num_partitions=10):
"""Разные партиции читаются с разных реплик."""
replica = random.choice(REPLICAS)
df = spark.read.jdbc(
url=f"jdbc:postgresql://{replica}:5432/mydb",
table=table_name,
numPartitions=num_partitions,
properties=connection_properties
)
return df
7. Асинхронное извлечение (Async Extract)
Асинхронные запросы — не блокируют основной поток:
import asyncio
import aiohttp
async def fetch_data_async(url):
"""Асинхронный запрос к API."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def fetch_multiple_endpoints():
"""Параллельно фетчить несколько endpoints."""
tasks = [
fetch_data_async("https://api.example.com/users"),
fetch_data_async("https://api.example.com/orders"),
fetch_data_async("https://api.example.com/products")
]
results = await asyncio.gather(*tasks)
return results
# Запустить
results = asyncio.run(fetch_multiple_endpoints())
Чеклист оптимизации нагрузки
- Оптимизированы запросы (Push-down predicates, SELECT нужные колонки)
- Созданы индексы на часто используемые колонки
- Используется кэширование (на уровне приложения и Spark)
- Реализовано инкрементальное извлечение (не полный reload)
- Connection pooling настроен (переиспользование соединений)
- Батчинг используется при вставке данных
- Аналитика отделена от production БД (read replicas)
- Используется асинхронность где применима
Вывод
Уменьшение нагрузки на источник данных — это комбинация техник: оптимизации запросов, кэширования, инкрементальной загрузки и архитектурных решений (CDC, read replicas). Правильный подход экономит ресурсы, повышает надёжность и скорость всей системы.