← Назад к вопросам

Какие знаешь запросы в API?

1.3 Junior🔥 151 комментариев
#Архитектура и проектирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

API запросы в data инженерии

API запросы — критическая часть работы data engineers'а, так как часто нужно интегрировать данные из различных источников через REST, GraphQL или других API. Рассмотрим основные типы запросов и best practices.

1. HTTP методы (REST API)

GET — получение данных

import requests

# Простой GET запрос
response = requests.get('https://api.example.com/users')
data = response.json()

# С параметрами
response = requests.get(
    'https://api.example.com/users',
    params={
        'page': 1,
        'limit': 100,
        'filter': 'active'
    }
)

# С заголовками (например, авторизация)
headers = {'Authorization': 'Bearer TOKEN'}
response = requests.get(
    'https://api.example.com/users',
    headers=headers
)

POST — создание новых данных

data = {
    'name': 'John',
    'email': 'john@example.com',
    'age': 30
}

response = requests.post(
    'https://api.example.com/users',
    json=data,
    headers={'Authorization': 'Bearer TOKEN'}
)

if response.status_code == 201:
    created = response.json()
    print(f"Created user: {created['id']}")

PUT — обновление полных данных

user_id = 123
updated_data = {'name': 'Jane', 'age': 31}

response = requests.put(
    f'https://api.example.com/users/{user_id}',
    json=updated_data
)

PATCH — частичное обновление

# Только обновляем age, name остаётся неизменным
response = requests.patch(
    f'https://api.example.com/users/{user_id}',
    json={'age': 32}
)

DELETE — удаление

response = requests.delete(
    f'https://api.example.com/users/{user_id}',
    headers={'Authorization': 'Bearer TOKEN'}
)

2. Обработка ошибок в API запросах

def safe_api_call(url, method='GET', **kwargs):
    try:
        if method == 'GET':
            response = requests.get(url, timeout=10, **kwargs)
        elif method == 'POST':
            response = requests.post(url, timeout=10, **kwargs)
        else:
            raise ValueError(f"Unsupported method: {method}")
        
        response.raise_for_status()  # Выбросит ошибку для 4xx, 5xx
        return response.json()
    
    except requests.ConnectionError:
        logger.error(f"Connection error for {url}")
        raise
    except requests.Timeout:
        logger.error(f"Timeout for {url}")
        raise
    except requests.HTTPError as e:
        logger.error(f"HTTP error {e.response.status_code}: {e.response.text}")
        raise
    except ValueError as e:
        logger.error(f"Invalid JSON response: {e}")
        raise

3. Пагинация

Offset-based пагинация:

def fetch_all_paginated(url, page_size=100):
    all_data = []
    page = 0
    
    while True:
        response = requests.get(
            url,
            params={
                'offset': page * page_size,
                'limit': page_size
            }
        )
        data = response.json()
        
        if not data:
            break
        
        all_data.extend(data)
        page += 1
    
    return all_data

Cursor-based пагинация (лучше для больших наборов):

def fetch_with_cursor(url):
    all_data = []
    cursor = None
    
    while True:
        params = {'limit': 100}
        if cursor:
            params['cursor'] = cursor
        
        response = requests.get(url, params=params)
        result = response.json()
        
        all_data.extend(result['data'])
        
        if not result.get('next_cursor'):
            break
        
        cursor = result['next_cursor']
    
    return all_data

4. Rate limiting

Exponential backoff для обработки rate limits:

import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session():
    session = requests.Session()
    retry = Retry(
        total=5,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504]
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

session = create_session()
response = session.get('https://api.example.com/data')

5. Batch API запросы

Параллельные запросы с ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def fetch_user_data(user_ids):
    def fetch_one(user_id):
        response = requests.get(f'https://api.example.com/users/{user_id}')
        return response.json()
    
    results = {}
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_one, uid): uid for uid in user_ids}
        
        for future in as_completed(futures):
            user_id = futures[future]
            try:
                results[user_id] = future.result()
            except Exception as e:
                logger.error(f"Error fetching user {user_id}: {e}")
    
    return results

Batch endpoint (если API поддерживает):

# Вместо N запросов, отправляем 1
user_ids = [1, 2, 3, 4, 5]

response = requests.post(
    'https://api.example.com/batch',
    json={
        'requests': [
            {'method': 'GET', 'url': f'/users/{uid}'}
            for uid in user_ids
        ]
    }
)

results = response.json()  # Все результаты в одном ответе

6. Работа с GraphQL API

def graphql_query(query_string, variables=None):
    headers = {
        'Authorization': 'Bearer TOKEN',
        'Content-Type': 'application/json'
    }
    
    payload = {
        'query': query_string,
        'variables': variables or {}
    }
    
    response = requests.post(
        'https://api.example.com/graphql',
        json=payload,
        headers=headers
    )
    
    result = response.json()
    if 'errors' in result:
        logger.error(f"GraphQL errors: {result['errors']}")
        raise Exception(result['errors'])
    
    return result['data']

# Использование
query = """
query GetUser($id: ID!) {
    user(id: $id) {
        id
        name
        email
    }
}
"""

user = graphql_query(query, variables={'id': '123'})

7. Кэширование API запросов

from functools import lru_cache
import requests
import hashlib
import json

class CachedAPI:
    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds
        self.last_fetch = {}
    
    def get(self, url, params=None):
        key = hashlib.md5(
            json.dumps({
                'url': url,
                'params': params
            }).encode()
        ).hexdigest()
        
        import time
        now = time.time()
        
        # Проверяем кэш
        if key in self.cache:
            if now - self.last_fetch[key] < self.ttl:
                return self.cache[key]
        
        # Выполняем запрос
        response = requests.get(url, params=params)
        data = response.json()
        
        # Кэшируем
        self.cache[key] = data
        self.last_fetch[key] = now
        
        return data

api = CachedAPI(ttl_seconds=3600)
data = api.get('https://api.example.com/users')

8. Streaming больших ответов

def stream_api_response(url):
    response = requests.get(url, stream=True)
    
    for chunk in response.iter_lines():
        if chunk:
            data = json.loads(chunk)
            yield data

# Использование
for item in stream_api_response('https://api.example.com/events'):
    process_item(item)

9. Работа с файлами через API

# Загрузка файла
with open('data.csv', 'rb') as f:
    files = {'file': f}
    response = requests.post(
        'https://api.example.com/upload',
        files=files
    )

# Загрузка файла из API
response = requests.get(
    'https://api.example.com/download/report.csv',
    stream=True
)

with open('downloaded_report.csv', 'wb') as f:
    for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)

10. Аутентификация в API

Bearer token:

headers = {'Authorization': 'Bearer eyJhbGc...'}
response = requests.get(url, headers=headers)

API Key:

headers = {'X-API-Key': 'your_api_key_here'}
response = requests.get(url, headers=headers)

Basic auth:

from requests.auth import HTTPBasicAuth
response = requests.get(
    url,
    auth=HTTPBasicAuth('username', 'password')
)

OAuth 2.0:

from requests_oauthlib import OAuth2Session

client_id = 'your_client_id'
client_secret = 'your_client_secret'

oauth = OAuth2Session(
    client_id,
    redirect_uri='http://localhost:8000/callback',
    scope=['user:email']
)

# Получить токен
token = oauth.fetch_token(
    'https://api.example.com/oauth/token',
    client_secret=client_secret,
    authorization_response='...',
)

# Использовать токен
response = oauth.get('https://api.example.com/user')

Best Practices

  1. Всегда обрабатывай ошибки и логируй их
  2. Используй таймауты для всех запросов
  3. Реализуй retry logic для отказоустойчивости
  4. Кэшируй результаты, если возможно
  5. Монитори rate limits API
  6. Документируй API endpoints, которые используешь
  7. Используй параллельные запросы для ускорения
  8. Валидируй данные после получения от API
  9. Вёрсионируй API endpoints (используй /v1/, /v2/)

Выводы

Работу с API запросами необходимо подходить ответственно: правильная обработка ошибок, rate limiting, кэширование и параллелизм. Для data engineers это критические навыки при интеграции данных из различных источников.