← Назад к вопросам

Какие библиотеки использовал для работы с API?

1.2 Junior🔥 191 комментариев
#Python#Инструменты разработки

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Библиотеки для работы с API в Data Engineering

Data Engineer часто работает с API для извлечения данных из внешних сервисов (SaaS, публичные API, микросервисы). Правильный выбор библиотеки влияет на надежность, производительность и удобство поддержки пайплайна.

HTTP-клиенты

1. Requests (синхронный)

Requests — самая популярная библиотека для HTTP запросов в Python.

import requests
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def requests_basic():
    """Базовое использование"""
    # GET запрос
    response = requests.get('https://api.github.com/users/octocat')
    
    # Проверяем статус
    if response.status_code == 200:
        data = response.json()  # Автоматически парсит JSON
        print(f"User: {data['login']}")
    else:
        print(f"Error: {response.status_code}")

def requests_with_retry():
    """С автоматическим retry при сбое сети"""
    session = requests.Session()
    
    # Стратегия retry
    retry_strategy = Retry(
        total=3,  # Максимум 3 попытки
        backoff_factor=1,  # Ждём 1s, 2s, 4s
        status_forcelist=[429, 500, 502, 503, 504],  # Retry на эти коды
        method_whitelist=["HEAD", "GET", "OPTIONS"]
    )
    
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    response = session.get('https://api.example.com/data')
    return response.json()

def requests_with_auth():
    """С авторизацией"""
    # Bearer token
    headers = {
        'Authorization': 'Bearer YOUR_API_KEY',
        'Content-Type': 'application/json'
    }
    
    response = requests.get(
        'https://api.example.com/users',
        headers=headers
    )
    
    # Или Basic Auth
    response = requests.get(
        'https://api.example.com/data',
        auth=('username', 'password')
    )
    
    return response.json()

def requests_post_with_data():
    """POST запрос с данными"""
    payload = {
        'name': 'Alice',
        'email': 'alice@example.com'
    }
    
    response = requests.post(
        'https://api.example.com/users',
        json=payload,  # Автоматически сериализует в JSON
        headers={'Authorization': 'Bearer TOKEN'}
    )
    
    return response.json()

# Проблемы requests:
# - Синхронный (блокирует поток)
# - Медленно для 1000+ параллельных запросов

Плюсы: простая API, стабильная, хорошая документация Минусы: синхронный (не для высокого throughput)

2. HTTPX (синхронный и асинхронный)

HTTPX — современная замена Requests с поддержкой async.

import httpx
import asyncio

# Синхронное использование (как requests)
def httpx_sync():
    client = httpx.Client()
    response = client.get('https://api.github.com/users/octocat')
    return response.json()

# Асинхронное использование (для высокого throughput)
async def httpx_async():
    async with httpx.AsyncClient() as client:
        response = await client.get('https://api.example.com/users/1')
        return response.json()

# Множественные параллельные запросы
async def fetch_multiple_users():
    """Параллельно загружает 1000 пользователей"""
    async with httpx.AsyncClient() as client:
        tasks = [
            client.get(f'https://api.example.com/users/{i}')
            for i in range(1, 1001)
        ]
        responses = await asyncio.gather(*tasks)
        return [r.json() for r in responses]

# Запустить асинхронный код
data = asyncio.run(fetch_multiple_users())
print(f"Loaded {len(data)} users")

# Retry с exponential backoff
from httpx import Client
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def get_with_retry(url: str):
    client = httpx.Client()
    response = client.get(url)
    response.raise_for_status()
    return response.json()

Плюсы: поддержка async, современная, быстрая Минусы: менее популярна чем requests

3. aiohttp (асинхронный)

aiohttp — специализирована на async, более низкоуровневая.

import aiohttp
import asyncio

async def aiohttp_example():
    """Асинхронные запросы через aiohttp"""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.github.com/users/octocat') as response:
            data = await response.json()
            print(data)

# Параллельные запросы с aiohttp
async def fetch_all_users():
    async with aiohttp.ClientSession() as session:
        tasks = [
            session.get(f'https://api.example.com/users/{i}')
            for i in range(1, 101)
        ]
        responses = await asyncio.gather(*tasks)
        return [await r.json() for r in responses]

asyncio.run(fetch_all_users())

Плюсы: очень быстрая, хорошо для огромного throughput Минусы: сложнее чем requests, требует знания async/await

Специализированные API-клиенты

1. AWS SDK (boto3)

import boto3

# S3 операции
s3 = boto3.client('s3')

# Upload file
s3.upload_file('local_file.txt', 'my-bucket', 'remote_file.txt')

# Download file
s3.download_file('my-bucket', 'remote_file.txt', 'local_file.txt')

# List objects
response = s3.list_objects_v2(Bucket='my-bucket', Prefix='data/')
for obj in response.get('Contents', []):
    print(obj['Key'], obj['Size'])

# Redshift queries
redshift = boto3.client('redshift-data')

response = redshift.execute_statement(
    ClusterIdentifier='my-cluster',
    Database='mydb',
    Sql='SELECT COUNT(*) FROM users'
)

statement_id = response['Id']
result = redshift.get_statement_result(Id=statement_id)
print(result['Records'])

2. Google Cloud Client Libraries

from google.cloud import bigquery
from google.cloud import storage

# BigQuery запросы
client = bigquery.Client()

query = """
    SELECT name, email FROM `project.dataset.users`
    WHERE country = 'US'
    LIMIT 1000
"""

query_job = client.query(query)
for row in query_job:
    print(f"{row.name}: {row.email}")

# Cloud Storage
storage_client = storage.Client()
bucket = storage_client.bucket('my-bucket')
blob = bucket.blob('data/file.parquet')
blob.upload_from_filename('local_file.parquet')

3. Snowflake Python Connector

import snowflake.connector

# Подключение
conn = snowflake.connector.connect(
    user='user@example.com',
    password='password',
    account='xy12345',  # Account identifier
    warehouse='compute_wh',
    database='mydb',
    schema='public'
)

cursor = conn.cursor()

# Запрос
cursor.execute('SELECT * FROM users LIMIT 10')
for row in cursor.fetchall():
    print(row)

# Bulk insert
cursor.execute('CREATE TEMP TABLE staging_users (id INT, name VARCHAR)')
cursor.executemany(
    'INSERT INTO staging_users VALUES (?, ?)',
    [(1, 'Alice'), (2, 'Bob'), (3, 'Carol')]
)

conn.commit()
conn.close()

Работа с REST API (дата engineering специфик)

Пример: ETL из REST API в Data Warehouse

import httpx
import pandas as pd
from datetime import datetime, timedelta
import logging

class APIDataExtractor:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.headers = {'Authorization': f'Bearer {api_key}'}
        self.logger = logging.getLogger(__name__)
    
    def fetch_paginated_data(self, endpoint: str, page_size: int = 1000):
        """Загружает все данные с пагинацией"""
        all_records = []
        page = 1
        
        with httpx.Client(headers=self.headers) as client:
            while True:
                url = f"{self.base_url}/{endpoint}"
                params = {
                    'page': page,
                    'page_size': page_size,
                    'sort': 'created_at desc'
                }
                
                try:
                    response = client.get(url, params=params, timeout=30.0)
                    response.raise_for_status()
                    
                    data = response.json()
                    records = data.get('data', [])
                    
                    if not records:
                        break
                    
                    all_records.extend(records)
                    
                    self.logger.info(f"Loaded page {page}: {len(records)} records")
                    page += 1
                    
                except httpx.TimeoutException:
                    self.logger.error(f"Timeout on page {page}")
                    break
                except httpx.HTTPStatusError as e:
                    self.logger.error(f"HTTP error {e.response.status_code}: {e.response.text}")
                    break
        
        return all_records
    
    def extract_to_dataframe(self, endpoint: str) -> pd.DataFrame:
        """Загружает данные в pandas DataFrame"""
        records = self.fetch_paginated_data(endpoint)
        df = pd.DataFrame(records)
        
        # Нормализуем типы
        df['created_at'] = pd.to_datetime(df['created_at'])
        df['updated_at'] = pd.to_datetime(df['updated_at'])
        
        return df

# Использование
extractor = APIDataExtractor(
    base_url='https://api.example.com/v1',
    api_key='YOUR_API_KEY'
)

# Загружаем данные
users_df = extractor.extract_to_dataframe('users')
print(f"Loaded {len(users_df)} users")

# Сохраняем в файл или БД
users_df.to_parquet('s3://my-bucket/raw/users/latest.parquet')

Rate Limiting и Backoff

import httpx
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
import time

class RateLimitedAPIClient:
    def __init__(self, requests_per_second: float = 10):
        self.requests_per_second = requests_per_second
        self.min_interval = 1.0 / requests_per_second
        self.last_request_time = 0
    
    @retry(
        stop=stop_after_attempt(5),
        wait=wait_exponential(multiplier=1, min=1, max=60),
        retry=retry_if_exception_type((httpx.HTTPError,))
    )
    def get(self, url: str) -> dict:
        # Rate limiting
        elapsed = time.time() - self.last_request_time
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        
        self.last_request_time = time.time()
        
        client = httpx.Client()
        response = client.get(url, timeout=30.0)
        response.raise_for_status()
        return response.json()

# Использование
client = RateLimitedAPIClient(requests_per_second=5)

for i in range(100):
    try:
        data = client.get(f'https://api.example.com/items/{i}')
        print(f"Item {i}: {data}")
    except Exception as e:
        print(f"Failed to fetch item {i}: {e}")

Сравнение для Data Engineering

ЗадачаБиблиотекаПричина
Простой GET/POSTrequestsПростая, стабильная
Параллельные запросыhttpx asyncАсинхронность, контроль concurrency
Очень высокий throughputaiohttpНаиболее оптимизирована
AWS интеграцияboto3Встроенная поддержка AWS
Google Cloudgoogle-cloud-*Встроенная поддержка GCP
Данные из SaaSспециализированные SDKКанонические SDK (Stripe, Salesforce)

Best Practices

1. Всегда используйте retry с backoff

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def fetch_data(url):
    return requests.get(url).json()

2. Кешируйте результаты API

import hashlib
import json
from pathlib import Path

def fetch_with_cache(url: str, cache_dir: str = '/tmp/api_cache'):
    cache_key = hashlib.md5(url.encode()).hexdigest()
    cache_file = Path(cache_dir) / f"{cache_key}.json"
    
    if cache_file.exists():
        with open(cache_file) as f:
            return json.load(f)
    
    response = requests.get(url)
    data = response.json()
    
    cache_dir_path = Path(cache_dir)
    cache_dir_path.mkdir(exist_ok=True)
    with open(cache_file, 'w') as f:
        json.dump(data, f)
    
    return data

3. Логируйте и мониторьте запросы

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def fetch_with_logging(url: str):
    try:
        logger.info(f"Fetching {url}")
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        logger.info(f"Success: {response.status_code}")
        return response.json()
    except requests.RequestException as e:
        logger.error(f"Failed to fetch {url}: {e}")
        raise

Вывод

Для Data Engineering выбирайте:

  • requests для простых синхронных задач
  • httpx или aiohttp для параллельных запросов
  • Специализированные SDK (boto3, google-cloud) для облачных сервисов
  • Всегда добавляйте retry, кеширование и логирование