← Назад к вопросам
Можно ли изменять несколько разнотипных объектов методом POST?
1.7 Middle🔥 111 комментариев
#REST API и HTTP
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Несколько объектов в POST запросе
Да, можно изменять несколько разнотипных объектов в одном POST запросе. Это называется batch операция или транзакция.
REST принципы
Официально, REST рекомендует:
- POST для создания (Создать один ресурс)
- PUT/PATCH для полного/частичного обновления
- DELETE для удаления
Но в практике POST часто используется для batch операций, так как это более гибко.
Варианты реализации
1. Массив объектов (Batch endpoint)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
class UserUpdate(BaseModel):
id: int
name: str
email: str
class ProductUpdate(BaseModel):
id: int
price: float
quantity: int
@app.post("/api/v1/batch/update")
async def batch_update(
updates: dict,
db: AsyncSession = Depends(get_db)
):
"""
POST /api/v1/batch/update
{
"users": [{"id": 1, "name": "John", "email": "john@example.com"}],
"products": [{"id": 1, "price": 100.0, "quantity": 50}]
}
"""
results = {"success": [], "errors": []}
try:
# Обновляем пользователей
if "users" in updates:
for user_data in updates["users"]:
user = await db.get(User, user_data["id"])
if not user:
results["errors"].append({
"type": "user",
"id": user_data["id"],
"error": "Not found"
})
continue
user.name = user_data["name"]
user.email = user_data["email"]
results["success"].append({"type": "user", "id": user.id})
# Обновляем продукты
if "products" in updates:
for product_data in updates["products"]:
product = await db.get(Product, product_data["id"])
if not product:
results["errors"].append({
"type": "product",
"id": product_data["id"],
"error": "Not found"
})
continue
product.price = product_data["price"]
product.quantity = product_data["quantity"]
results["success"].append({"type": "product", "id": product.id})
# Транзакция
await db.commit()
return results
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=str(e))
2. Типизированный Pydantic Union
from typing import Union, List
from pydantic import BaseModel, Field
class UserUpdateRequest(BaseModel):
type: str = "user"
id: int
name: str
email: str
class ProductUpdateRequest(BaseModel):
type: str = "product"
id: int
price: float
class BatchUpdateRequest(BaseModel):
# Используем Union для полиморфизма
updates: List[Union[UserUpdateRequest, ProductUpdateRequest]] = Field(
discriminator='type'
)
@app.post("/api/v1/batch/update-typed")
async def batch_update_typed(
request: BatchUpdateRequest,
db: AsyncSession = Depends(get_db)
):
"""
POST /api/v1/batch/update-typed
{
"updates": [
{"type": "user", "id": 1, "name": "John", "email": "john@example.com"},
{"type": "product", "id": 1, "price": 99.99}
]
}
"""
results = {"success": [], "errors": []}
try:
for update in request.updates:
if isinstance(update, UserUpdateRequest):
user = await db.get(User, update.id)
if user:
user.name = update.name
user.email = update.email
results["success"].append({"type": "user", "id": user.id})
else:
results["errors"].append({"type": "user", "id": update.id})
elif isinstance(update, ProductUpdateRequest):
product = await db.get(Product, update.id)
if product:
product.price = update.price
results["success"].append({"type": "product", "id": product.id})
else:
results["errors"].append({"type": "product", "id": update.id})
await db.commit()
return results
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail=str(e))
3. Отдельные операции (более идиоматично для REST)
# Вместо одного батч эндпоинта - несколько специализированных
@app.post("/api/v1/users/batch")
async def update_users(
updates: List[UserUpdate],
db: AsyncSession = Depends(get_db)
):
"""Только пользователи"""
for user_data in updates:
user = await db.get(User, user_data.id)
if user:
user.name = user_data.name
user.email = user_data.email
await db.commit()
return {"updated": len(updates)}
@app.post("/api/v1/products/batch")
async def update_products(
updates: List[ProductUpdate],
db: AsyncSession = Depends(get_db)
):
"""Только продукты"""
for product_data in updates:
product = await db.get(Product, product_data.id)
if product:
product.price = product_data.price
await db.commit()
return {"updated": len(updates)}
Транзакции
# Критично использовать транзакции для batch операций
# Либо всё обновляется, либо ничего
class BatchTransactionService:
def __init__(self, db: AsyncSession):
self.db = db
async def update_multiple(
self,
user_updates: List[UserUpdate],
product_updates: List[ProductUpdate]
):
try:
# Все обновления в одной транзакции
for user_data in user_updates:
user = await self.db.get(User, user_data.id)
if user:
user.name = user_data.name
user.email = user_data.email
for product_data in product_updates:
product = await self.db.get(Product, product_data.id)
if product:
product.price = product_data.price
# Commit всего сразу
await self.db.commit()
return {"status": "success", "updates": len(user_updates) + len(product_updates)}
except Exception as e:
# Откатываем всё если ошибка
await self.db.rollback()
raise
Обработка ошибок
class BatchUpdateResponse(BaseModel):
status: str
success: List[dict]
errors: List[dict]
partial: bool # True если часть успешно, часть ошибка
@app.post("/api/v1/batch/update-safe")
async def batch_update_safe(
updates: dict,
db: AsyncSession = Depends(get_db)
) -> BatchUpdateResponse:
"""
Батч с обработкой ошибок
- Если ошибка в одном элементе, остальные всё равно обновляются
- Возвращает список успехов и ошибок
"""
success = []
errors = []
async with db.begin(): # Transaction
try:
if "users" in updates:
for user_data in updates["users"]:
try:
user = await db.get(User, user_data["id"])
if not user:
errors.append({
"type": "user",
"id": user_data["id"],
"error": "User not found"
})
continue
user.name = user_data.get("name", user.name)
user.email = user_data.get("email", user.email)
success.append({"type": "user", "id": user.id})
except Exception as e:
errors.append({
"type": "user",
"id": user_data.get("id"),
"error": str(e)
})
# Аналогично для других типов
await db.commit()
except Exception as e:
await db.rollback()
return BatchUpdateResponse(
status="error",
success=success,
errors=errors + [{"error": str(e)}],
partial=len(success) > 0
)
return BatchUpdateResponse(
status="success" if not errors else "partial",
success=success,
errors=errors,
partial=len(errors) > 0
)
REST Design: когда использовать POST vs PATCH
ОПЕРАЦИЯ HTTP METHOD ENDPOINT
─────────────────────────────────────────────
Создать одно POST /users
Обновить одно PATCH/PUT /users/{id}
Удалить одно DELETE /users/{id}
Создать много POST /users/batch (батч)
Обновить много PATCH /users (с фильтром) или POST /batch
Удалить много DELETE /users (с фильтром) или POST /batch/delete
Лучшие практики
- Используй транзакции - либо всё, либо ничего
- Возвращай детальный ответ - какие успехи, какие ошибки
- Помечай тип объекта - в ответе указывай что обновилось
- Ограничивай размер батча - например, максимум 1000 объектов
- Документируй - какие типы можно обновлять вместе
# ✅ Хорошо
@app.post("/api/v1/batch/update")
async def batch_update(
request: BatchUpdateRequest,
db: AsyncSession = Depends(get_db)
):
# Логирование
logger.info(f"Batch update: {len(request.updates)} items")
# Валидация размера
if len(request.updates) > 1000:
raise HTTPException(
status_code=400,
detail="Batch size must be <= 1000"
)
# Обновление с транзакцией
# ...
Итого
- Да, можно обновлять несколько разнотипных объектов в POST
- Используй батч эндпоинты для большого количества обновлений
- Всегда используй транзакции - atomicity важна
- Возвращай подробный ответ - какие успехи, какие ошибки
- Лучше отдельные эндпоинты для каждого типа (более REST идиоматично)