Какие знаешь запросы в API?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
API запросы в data инженерии
API запросы — критическая часть работы data engineers'а, так как часто нужно интегрировать данные из различных источников через REST, GraphQL или других API. Рассмотрим основные типы запросов и best practices.
1. HTTP методы (REST API)
GET — получение данных
import requests
# Простой GET запрос
response = requests.get('https://api.example.com/users')
data = response.json()
# С параметрами
response = requests.get(
'https://api.example.com/users',
params={
'page': 1,
'limit': 100,
'filter': 'active'
}
)
# С заголовками (например, авторизация)
headers = {'Authorization': 'Bearer TOKEN'}
response = requests.get(
'https://api.example.com/users',
headers=headers
)
POST — создание новых данных
data = {
'name': 'John',
'email': 'john@example.com',
'age': 30
}
response = requests.post(
'https://api.example.com/users',
json=data,
headers={'Authorization': 'Bearer TOKEN'}
)
if response.status_code == 201:
created = response.json()
print(f"Created user: {created['id']}")
PUT — обновление полных данных
user_id = 123
updated_data = {'name': 'Jane', 'age': 31}
response = requests.put(
f'https://api.example.com/users/{user_id}',
json=updated_data
)
PATCH — частичное обновление
# Только обновляем age, name остаётся неизменным
response = requests.patch(
f'https://api.example.com/users/{user_id}',
json={'age': 32}
)
DELETE — удаление
response = requests.delete(
f'https://api.example.com/users/{user_id}',
headers={'Authorization': 'Bearer TOKEN'}
)
2. Обработка ошибок в API запросах
def safe_api_call(url, method='GET', **kwargs):
try:
if method == 'GET':
response = requests.get(url, timeout=10, **kwargs)
elif method == 'POST':
response = requests.post(url, timeout=10, **kwargs)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status() # Выбросит ошибку для 4xx, 5xx
return response.json()
except requests.ConnectionError:
logger.error(f"Connection error for {url}")
raise
except requests.Timeout:
logger.error(f"Timeout for {url}")
raise
except requests.HTTPError as e:
logger.error(f"HTTP error {e.response.status_code}: {e.response.text}")
raise
except ValueError as e:
logger.error(f"Invalid JSON response: {e}")
raise
3. Пагинация
Offset-based пагинация:
def fetch_all_paginated(url, page_size=100):
all_data = []
page = 0
while True:
response = requests.get(
url,
params={
'offset': page * page_size,
'limit': page_size
}
)
data = response.json()
if not data:
break
all_data.extend(data)
page += 1
return all_data
Cursor-based пагинация (лучше для больших наборов):
def fetch_with_cursor(url):
all_data = []
cursor = None
while True:
params = {'limit': 100}
if cursor:
params['cursor'] = cursor
response = requests.get(url, params=params)
result = response.json()
all_data.extend(result['data'])
if not result.get('next_cursor'):
break
cursor = result['next_cursor']
return all_data
4. Rate limiting
Exponential backoff для обработки rate limits:
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session():
session = requests.Session()
retry = Retry(
total=5,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
session = create_session()
response = session.get('https://api.example.com/data')
5. Batch API запросы
Параллельные запросы с ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch_user_data(user_ids):
def fetch_one(user_id):
response = requests.get(f'https://api.example.com/users/{user_id}')
return response.json()
results = {}
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(fetch_one, uid): uid for uid in user_ids}
for future in as_completed(futures):
user_id = futures[future]
try:
results[user_id] = future.result()
except Exception as e:
logger.error(f"Error fetching user {user_id}: {e}")
return results
Batch endpoint (если API поддерживает):
# Вместо N запросов, отправляем 1
user_ids = [1, 2, 3, 4, 5]
response = requests.post(
'https://api.example.com/batch',
json={
'requests': [
{'method': 'GET', 'url': f'/users/{uid}'}
for uid in user_ids
]
}
)
results = response.json() # Все результаты в одном ответе
6. Работа с GraphQL API
def graphql_query(query_string, variables=None):
headers = {
'Authorization': 'Bearer TOKEN',
'Content-Type': 'application/json'
}
payload = {
'query': query_string,
'variables': variables or {}
}
response = requests.post(
'https://api.example.com/graphql',
json=payload,
headers=headers
)
result = response.json()
if 'errors' in result:
logger.error(f"GraphQL errors: {result['errors']}")
raise Exception(result['errors'])
return result['data']
# Использование
query = """
query GetUser($id: ID!) {
user(id: $id) {
id
name
email
}
}
"""
user = graphql_query(query, variables={'id': '123'})
7. Кэширование API запросов
from functools import lru_cache
import requests
import hashlib
import json
class CachedAPI:
def __init__(self, ttl_seconds=3600):
self.cache = {}
self.ttl = ttl_seconds
self.last_fetch = {}
def get(self, url, params=None):
key = hashlib.md5(
json.dumps({
'url': url,
'params': params
}).encode()
).hexdigest()
import time
now = time.time()
# Проверяем кэш
if key in self.cache:
if now - self.last_fetch[key] < self.ttl:
return self.cache[key]
# Выполняем запрос
response = requests.get(url, params=params)
data = response.json()
# Кэшируем
self.cache[key] = data
self.last_fetch[key] = now
return data
api = CachedAPI(ttl_seconds=3600)
data = api.get('https://api.example.com/users')
8. Streaming больших ответов
def stream_api_response(url):
response = requests.get(url, stream=True)
for chunk in response.iter_lines():
if chunk:
data = json.loads(chunk)
yield data
# Использование
for item in stream_api_response('https://api.example.com/events'):
process_item(item)
9. Работа с файлами через API
# Загрузка файла
with open('data.csv', 'rb') as f:
files = {'file': f}
response = requests.post(
'https://api.example.com/upload',
files=files
)
# Загрузка файла из API
response = requests.get(
'https://api.example.com/download/report.csv',
stream=True
)
with open('downloaded_report.csv', 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
10. Аутентификация в API
Bearer token:
headers = {'Authorization': 'Bearer eyJhbGc...'}
response = requests.get(url, headers=headers)
API Key:
headers = {'X-API-Key': 'your_api_key_here'}
response = requests.get(url, headers=headers)
Basic auth:
from requests.auth import HTTPBasicAuth
response = requests.get(
url,
auth=HTTPBasicAuth('username', 'password')
)
OAuth 2.0:
from requests_oauthlib import OAuth2Session
client_id = 'your_client_id'
client_secret = 'your_client_secret'
oauth = OAuth2Session(
client_id,
redirect_uri='http://localhost:8000/callback',
scope=['user:email']
)
# Получить токен
token = oauth.fetch_token(
'https://api.example.com/oauth/token',
client_secret=client_secret,
authorization_response='...',
)
# Использовать токен
response = oauth.get('https://api.example.com/user')
Best Practices
- Всегда обрабатывай ошибки и логируй их
- Используй таймауты для всех запросов
- Реализуй retry logic для отказоустойчивости
- Кэшируй результаты, если возможно
- Монитори rate limits API
- Документируй API endpoints, которые используешь
- Используй параллельные запросы для ускорения
- Валидируй данные после получения от API
- Вёрсионируй API endpoints (используй /v1/, /v2/)
Выводы
Работу с API запросами необходимо подходить ответственно: правильная обработка ошибок, rate limiting, кэширование и параллелизм. Для data engineers это критические навыки при интеграции данных из различных источников.