Какие библиотеки использовал для работы с API?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Библиотеки для работы с API в Data Engineering
Data Engineer часто работает с API для извлечения данных из внешних сервисов (SaaS, публичные API, микросервисы). Правильный выбор библиотеки влияет на надежность, производительность и удобство поддержки пайплайна.
HTTP-клиенты
1. Requests (синхронный)
Requests — самая популярная библиотека для HTTP запросов в Python.
import requests
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def requests_basic():
"""Базовое использование"""
# GET запрос
response = requests.get('https://api.github.com/users/octocat')
# Проверяем статус
if response.status_code == 200:
data = response.json() # Автоматически парсит JSON
print(f"User: {data['login']}")
else:
print(f"Error: {response.status_code}")
def requests_with_retry():
"""С автоматическим retry при сбое сети"""
session = requests.Session()
# Стратегия retry
retry_strategy = Retry(
total=3, # Максимум 3 попытки
backoff_factor=1, # Ждём 1s, 2s, 4s
status_forcelist=[429, 500, 502, 503, 504], # Retry на эти коды
method_whitelist=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
response = session.get('https://api.example.com/data')
return response.json()
def requests_with_auth():
"""С авторизацией"""
# Bearer token
headers = {
'Authorization': 'Bearer YOUR_API_KEY',
'Content-Type': 'application/json'
}
response = requests.get(
'https://api.example.com/users',
headers=headers
)
# Или Basic Auth
response = requests.get(
'https://api.example.com/data',
auth=('username', 'password')
)
return response.json()
def requests_post_with_data():
"""POST запрос с данными"""
payload = {
'name': 'Alice',
'email': 'alice@example.com'
}
response = requests.post(
'https://api.example.com/users',
json=payload, # Автоматически сериализует в JSON
headers={'Authorization': 'Bearer TOKEN'}
)
return response.json()
# Проблемы requests:
# - Синхронный (блокирует поток)
# - Медленно для 1000+ параллельных запросов
Плюсы: простая API, стабильная, хорошая документация Минусы: синхронный (не для высокого throughput)
2. HTTPX (синхронный и асинхронный)
HTTPX — современная замена Requests с поддержкой async.
import httpx
import asyncio
# Синхронное использование (как requests)
def httpx_sync():
client = httpx.Client()
response = client.get('https://api.github.com/users/octocat')
return response.json()
# Асинхронное использование (для высокого throughput)
async def httpx_async():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/users/1')
return response.json()
# Множественные параллельные запросы
async def fetch_multiple_users():
"""Параллельно загружает 1000 пользователей"""
async with httpx.AsyncClient() as client:
tasks = [
client.get(f'https://api.example.com/users/{i}')
for i in range(1, 1001)
]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
# Запустить асинхронный код
data = asyncio.run(fetch_multiple_users())
print(f"Loaded {len(data)} users")
# Retry с exponential backoff
from httpx import Client
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def get_with_retry(url: str):
client = httpx.Client()
response = client.get(url)
response.raise_for_status()
return response.json()
Плюсы: поддержка async, современная, быстрая Минусы: менее популярна чем requests
3. aiohttp (асинхронный)
aiohttp — специализирована на async, более низкоуровневая.
import aiohttp
import asyncio
async def aiohttp_example():
"""Асинхронные запросы через aiohttp"""
async with aiohttp.ClientSession() as session:
async with session.get('https://api.github.com/users/octocat') as response:
data = await response.json()
print(data)
# Параллельные запросы с aiohttp
async def fetch_all_users():
async with aiohttp.ClientSession() as session:
tasks = [
session.get(f'https://api.example.com/users/{i}')
for i in range(1, 101)
]
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
asyncio.run(fetch_all_users())
Плюсы: очень быстрая, хорошо для огромного throughput Минусы: сложнее чем requests, требует знания async/await
Специализированные API-клиенты
1. AWS SDK (boto3)
import boto3
# S3 операции
s3 = boto3.client('s3')
# Upload file
s3.upload_file('local_file.txt', 'my-bucket', 'remote_file.txt')
# Download file
s3.download_file('my-bucket', 'remote_file.txt', 'local_file.txt')
# List objects
response = s3.list_objects_v2(Bucket='my-bucket', Prefix='data/')
for obj in response.get('Contents', []):
print(obj['Key'], obj['Size'])
# Redshift queries
redshift = boto3.client('redshift-data')
response = redshift.execute_statement(
ClusterIdentifier='my-cluster',
Database='mydb',
Sql='SELECT COUNT(*) FROM users'
)
statement_id = response['Id']
result = redshift.get_statement_result(Id=statement_id)
print(result['Records'])
2. Google Cloud Client Libraries
from google.cloud import bigquery
from google.cloud import storage
# BigQuery запросы
client = bigquery.Client()
query = """
SELECT name, email FROM `project.dataset.users`
WHERE country = 'US'
LIMIT 1000
"""
query_job = client.query(query)
for row in query_job:
print(f"{row.name}: {row.email}")
# Cloud Storage
storage_client = storage.Client()
bucket = storage_client.bucket('my-bucket')
blob = bucket.blob('data/file.parquet')
blob.upload_from_filename('local_file.parquet')
3. Snowflake Python Connector
import snowflake.connector
# Подключение
conn = snowflake.connector.connect(
user='user@example.com',
password='password',
account='xy12345', # Account identifier
warehouse='compute_wh',
database='mydb',
schema='public'
)
cursor = conn.cursor()
# Запрос
cursor.execute('SELECT * FROM users LIMIT 10')
for row in cursor.fetchall():
print(row)
# Bulk insert
cursor.execute('CREATE TEMP TABLE staging_users (id INT, name VARCHAR)')
cursor.executemany(
'INSERT INTO staging_users VALUES (?, ?)',
[(1, 'Alice'), (2, 'Bob'), (3, 'Carol')]
)
conn.commit()
conn.close()
Работа с REST API (дата engineering специфик)
Пример: ETL из REST API в Data Warehouse
import httpx
import pandas as pd
from datetime import datetime, timedelta
import logging
class APIDataExtractor:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {api_key}'}
self.logger = logging.getLogger(__name__)
def fetch_paginated_data(self, endpoint: str, page_size: int = 1000):
"""Загружает все данные с пагинацией"""
all_records = []
page = 1
with httpx.Client(headers=self.headers) as client:
while True:
url = f"{self.base_url}/{endpoint}"
params = {
'page': page,
'page_size': page_size,
'sort': 'created_at desc'
}
try:
response = client.get(url, params=params, timeout=30.0)
response.raise_for_status()
data = response.json()
records = data.get('data', [])
if not records:
break
all_records.extend(records)
self.logger.info(f"Loaded page {page}: {len(records)} records")
page += 1
except httpx.TimeoutException:
self.logger.error(f"Timeout on page {page}")
break
except httpx.HTTPStatusError as e:
self.logger.error(f"HTTP error {e.response.status_code}: {e.response.text}")
break
return all_records
def extract_to_dataframe(self, endpoint: str) -> pd.DataFrame:
"""Загружает данные в pandas DataFrame"""
records = self.fetch_paginated_data(endpoint)
df = pd.DataFrame(records)
# Нормализуем типы
df['created_at'] = pd.to_datetime(df['created_at'])
df['updated_at'] = pd.to_datetime(df['updated_at'])
return df
# Использование
extractor = APIDataExtractor(
base_url='https://api.example.com/v1',
api_key='YOUR_API_KEY'
)
# Загружаем данные
users_df = extractor.extract_to_dataframe('users')
print(f"Loaded {len(users_df)} users")
# Сохраняем в файл или БД
users_df.to_parquet('s3://my-bucket/raw/users/latest.parquet')
Rate Limiting и Backoff
import httpx
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import time
class RateLimitedAPIClient:
def __init__(self, requests_per_second: float = 10):
self.requests_per_second = requests_per_second
self.min_interval = 1.0 / requests_per_second
self.last_request_time = 0
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((httpx.HTTPError,))
)
def get(self, url: str) -> dict:
# Rate limiting
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_request_time = time.time()
client = httpx.Client()
response = client.get(url, timeout=30.0)
response.raise_for_status()
return response.json()
# Использование
client = RateLimitedAPIClient(requests_per_second=5)
for i in range(100):
try:
data = client.get(f'https://api.example.com/items/{i}')
print(f"Item {i}: {data}")
except Exception as e:
print(f"Failed to fetch item {i}: {e}")
Сравнение для Data Engineering
| Задача | Библиотека | Причина |
|---|---|---|
| Простой GET/POST | requests | Простая, стабильная |
| Параллельные запросы | httpx async | Асинхронность, контроль concurrency |
| Очень высокий throughput | aiohttp | Наиболее оптимизирована |
| AWS интеграция | boto3 | Встроенная поддержка AWS |
| Google Cloud | google-cloud-* | Встроенная поддержка GCP |
| Данные из SaaS | специализированные SDK | Канонические SDK (Stripe, Salesforce) |
Best Practices
1. Всегда используйте retry с backoff
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def fetch_data(url):
return requests.get(url).json()
2. Кешируйте результаты API
import hashlib
import json
from pathlib import Path
def fetch_with_cache(url: str, cache_dir: str = '/tmp/api_cache'):
cache_key = hashlib.md5(url.encode()).hexdigest()
cache_file = Path(cache_dir) / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file) as f:
return json.load(f)
response = requests.get(url)
data = response.json()
cache_dir_path = Path(cache_dir)
cache_dir_path.mkdir(exist_ok=True)
with open(cache_file, 'w') as f:
json.dump(data, f)
return data
3. Логируйте и мониторьте запросы
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_with_logging(url: str):
try:
logger.info(f"Fetching {url}")
response = requests.get(url, timeout=10)
response.raise_for_status()
logger.info(f"Success: {response.status_code}")
return response.json()
except requests.RequestException as e:
logger.error(f"Failed to fetch {url}: {e}")
raise
Вывод
Для Data Engineering выбирайте:
- requests для простых синхронных задач
- httpx или aiohttp для параллельных запросов
- Специализированные SDK (boto3, google-cloud) для облачных сервисов
- Всегда добавляйте retry, кеширование и логирование