← Назад к вопросам

Синхронизация данных с внешнего API

2.0 Middle🔥 231 комментариев
#Python Core#REST API и HTTP

Условие

Необходимо загрузить 100 000+ JSON-объектов из внешнего API и синхронизировать их с таблицей в базе данных.

Реализуйте:

  • Создание новых записей
  • Обновление изменённых полей
  • Пометку удалённых записей

Подходы к решению

  1. Отдельные запросы (N запросов) — почему плохо?
  2. Загрузка всей таблицы в память — когда не работает?
  3. Батчевая обработка — как реализовать оптимально?

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Синхронизация данных с внешнего API

Проблема

При работе с большими объёмами данных (100К+) критичны производительность, надёжность и консистентность. Наивные подходы приводят к таймаутам, утечкам памяти и потере данных.

1. Отдельные запросы (N запросы) — почему плохо?

Проблемы:

  • O(N) сетевых операций — для 100K объектов это 100K запросов
  • Нет возможности батчировать обновления в БД
  • Высокая вероятность сбоев — любой timeout приводит к переделке
  • Расходует соединения — connection pool исчерпывается
  • Нарушает rate-limit API
# ❌ Плохо: N запросы
for item_id in all_ids:
    response = requests.get(f"https://api.example.com/items/{item_id}")
    db.update(item_id, response.json())

2. Загрузка всей таблицы в память — когда не работает?

Проблемы:

  • RAM исчерпывается — 100K объектов × 10KB = 1GB минимум
  • На больших таблицах это невозможно
  • Потеря актуальности — если данные меняются во время загрузки
  • Медленнее чем батчи — переделываем всё каждый раз

3. Батчевая обработка — оптимальное решение

Ключевые принципы:

  • Загружаем данные чанками (1000-2000 объектов)
  • Используем UPSERT (INSERT ... ON CONFLICT)
  • Отслеживаем обработанные ID для детектирования удалённых
  • Параллелизм для ускорения
import requests
from sqlalchemy import text
from sqlalchemy.orm import Session

class APIDataSynchronizer:
    def __init__(self, db: Session, api_url: str, batch_size: int = 1000):
        self.db = db
        self.api_url = api_url
        self.batch_size = batch_size
    
    def fetch_from_api(self, offset: int) -> list:
        response = requests.get(
            f"{self.api_url}/items",
            params={"offset": offset, "limit": self.batch_size},
            timeout=30
        )
        response.raise_for_status()
        return response.json().get("items", [])
    
    def sync_batch(self, items: list) -> None:
        if not items:
            return
        
        query = text("""
            INSERT INTO items (id, title, data, updated_at)
            VALUES (:id, :title, :data, NOW())
            ON CONFLICT (id) DO UPDATE SET
                title = EXCLUDED.title,
                data = EXCLUDED.data,
                updated_at = NOW()
        """)
        
        for item in items:
            self.db.execute(
                query,
                {"id": item["id"], "title": item["title"], "data": item}
            )
        self.db.commit()
    
    def mark_deleted(self, processed_ids: set) -> None:
        query = text("""
            UPDATE items 
            SET is_deleted = true, deleted_at = NOW()
            WHERE id NOT IN :processed_ids AND is_deleted = false
        """)
        self.db.execute(query, {"processed_ids": tuple(processed_ids)})
        self.db.commit()
    
    def sync_all(self) -> None:
        offset = 0
        processed_ids = set()
        
        while True:
            items = self.fetch_from_api(offset)
            if not items:
                break
            
            self.sync_batch(items)
            processed_ids.update(item["id"] for item in items)
            offset += self.batch_size
            print(f"Обработано: {offset} записей")
        
        self.mark_deleted(processed_ids)
        print(f"Синхронизация завершена. Всего: {offset}")

Асинхронный батч (быстрее)

import asyncio
import aiohttp

class AsyncAPIDataSynchronizer(APIDataSynchronizer):
    def __init__(self, db: Session, api_url: str, batch_size: int = 1000):
        super().__init__(db, api_url, batch_size)
        self.concurrent_requests = 5
    
    async def fetch_from_api_async(self, session: aiohttp.ClientSession, offset: int) -> list:
        async with session.get(
            f"{self.api_url}/items",
            params={"offset": offset, "limit": self.batch_size},
            timeout=aiohttp.ClientTimeout(total=30)
        ) as resp:
            resp.raise_for_status()
            data = await resp.json()
            return data.get("items", [])

Рекомендации

ПараметрЗначениеПочему
batch_size500-2000Баланс между памятью и БД операциями
concurrent3-10Не перегружаем API
timeout30sНе бесконечно
retryexponential backoffДля 429/5xx ошибок

Вывод: Батчевая обработка с UPSERT — стандарт для синхронизации больших объёмов. Асинхронность даёт 3-5x ускорение.