← Назад к вопросам
Синхронизация данных с внешнего API
2.0 Middle🔥 231 комментариев
#Python Core#REST API и HTTP
Условие
Необходимо загрузить 100 000+ JSON-объектов из внешнего API и синхронизировать их с таблицей в базе данных.
Реализуйте:
- Создание новых записей
- Обновление изменённых полей
- Пометку удалённых записей
Подходы к решению
- Отдельные запросы (N запросов) — почему плохо?
- Загрузка всей таблицы в память — когда не работает?
- Батчевая обработка — как реализовать оптимально?
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Синхронизация данных с внешнего API
Проблема
При работе с большими объёмами данных (100К+) критичны производительность, надёжность и консистентность. Наивные подходы приводят к таймаутам, утечкам памяти и потере данных.
1. Отдельные запросы (N запросы) — почему плохо?
Проблемы:
- O(N) сетевых операций — для 100K объектов это 100K запросов
- Нет возможности батчировать обновления в БД
- Высокая вероятность сбоев — любой timeout приводит к переделке
- Расходует соединения — connection pool исчерпывается
- Нарушает rate-limit API
# ❌ Плохо: N запросы
for item_id in all_ids:
response = requests.get(f"https://api.example.com/items/{item_id}")
db.update(item_id, response.json())
2. Загрузка всей таблицы в память — когда не работает?
Проблемы:
- RAM исчерпывается — 100K объектов × 10KB = 1GB минимум
- На больших таблицах это невозможно
- Потеря актуальности — если данные меняются во время загрузки
- Медленнее чем батчи — переделываем всё каждый раз
3. Батчевая обработка — оптимальное решение
Ключевые принципы:
- Загружаем данные чанками (1000-2000 объектов)
- Используем UPSERT (INSERT ... ON CONFLICT)
- Отслеживаем обработанные ID для детектирования удалённых
- Параллелизм для ускорения
import requests
from sqlalchemy import text
from sqlalchemy.orm import Session
class APIDataSynchronizer:
def __init__(self, db: Session, api_url: str, batch_size: int = 1000):
self.db = db
self.api_url = api_url
self.batch_size = batch_size
def fetch_from_api(self, offset: int) -> list:
response = requests.get(
f"{self.api_url}/items",
params={"offset": offset, "limit": self.batch_size},
timeout=30
)
response.raise_for_status()
return response.json().get("items", [])
def sync_batch(self, items: list) -> None:
if not items:
return
query = text("""
INSERT INTO items (id, title, data, updated_at)
VALUES (:id, :title, :data, NOW())
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
data = EXCLUDED.data,
updated_at = NOW()
""")
for item in items:
self.db.execute(
query,
{"id": item["id"], "title": item["title"], "data": item}
)
self.db.commit()
def mark_deleted(self, processed_ids: set) -> None:
query = text("""
UPDATE items
SET is_deleted = true, deleted_at = NOW()
WHERE id NOT IN :processed_ids AND is_deleted = false
""")
self.db.execute(query, {"processed_ids": tuple(processed_ids)})
self.db.commit()
def sync_all(self) -> None:
offset = 0
processed_ids = set()
while True:
items = self.fetch_from_api(offset)
if not items:
break
self.sync_batch(items)
processed_ids.update(item["id"] for item in items)
offset += self.batch_size
print(f"Обработано: {offset} записей")
self.mark_deleted(processed_ids)
print(f"Синхронизация завершена. Всего: {offset}")
Асинхронный батч (быстрее)
import asyncio
import aiohttp
class AsyncAPIDataSynchronizer(APIDataSynchronizer):
def __init__(self, db: Session, api_url: str, batch_size: int = 1000):
super().__init__(db, api_url, batch_size)
self.concurrent_requests = 5
async def fetch_from_api_async(self, session: aiohttp.ClientSession, offset: int) -> list:
async with session.get(
f"{self.api_url}/items",
params={"offset": offset, "limit": self.batch_size},
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("items", [])
Рекомендации
| Параметр | Значение | Почему |
|---|---|---|
| batch_size | 500-2000 | Баланс между памятью и БД операциями |
| concurrent | 3-10 | Не перегружаем API |
| timeout | 30s | Не бесконечно |
| retry | exponential backoff | Для 429/5xx ошибок |
Вывод: Батчевая обработка с UPSERT — стандарт для синхронизации больших объёмов. Асинхронность даёт 3-5x ускорение.