Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
UPSERT: UPDATE или INSERT в одной операции
UPSERT (Update or Insert) — это операция которая либо обновляет существующую запись, либо вставляет новую, если она не существует. Это очень полезно для синхронизации данных и избегания проблем с дубликатами.
Синтаксис UPSERT в разных БД
PostgreSQL: ON CONFLICT
-- Базовый синтаксис
INSERT INTO users (id, email, name) VALUES (1, 'alice@example.com', 'Alice')
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name;
-- EXCLUDED указывает на значения которые пытались вставить
-- По PRIMARY KEY
INSERT INTO users (id, email) VALUES (1, 'alice@example.com')
ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email;
-- По UNIQUE индексу
INSERT INTO users (email, name) VALUES ('alice@example.com', 'Alice')
ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name;
-- Игнорировать конфликт (ничего не делать)
INSERT INTO users (id, email) VALUES (1, 'alice@example.com')
ON CONFLICT (id) DO NOTHING;
-- С условием
INSERT INTO users (id, email, updated_at) VALUES (1, 'alice@example.com', NOW())
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
updated_at = NOW()
WHERE users.updated_at < NOW() - INTERVAL '1 day';
MySQL: ON DUPLICATE KEY UPDATE
-- Базовый синтаксис
INSERT INTO users (id, email, name) VALUES (1, 'alice@example.com', 'Alice')
ON DUPLICATE KEY UPDATE
email = VALUES(email),
name = VALUES(name);
-- Или используй VALUES() функцию
INSERT INTO users (id, email, name) VALUES (1, 'alice@example.com', 'Alice')
ON DUPLICATE KEY UPDATE
email = VALUE(email),
updated_at = NOW();
-- Несколько строк
INSERT INTO users (id, email, name) VALUES
(1, 'alice@example.com', 'Alice'),
(2, 'bob@example.com', 'Bob')
ON DUPLICATE KEY UPDATE
email = VALUES(email),
name = VALUES(name);
SQLite: INSERT OR REPLACE
-- Заменить если существует
INSERT OR REPLACE INTO users (id, email, name)
VALUES (1, 'alice@example.com', 'Alice');
-- Игнорировать если существует
INSERT OR IGNORE INTO users (id, email, name)
VALUES (1, 'alice@example.com', 'Alice');
-- Обновить если существует
INSERT INTO users (id, email, name)
VALUES (1, 'alice@example.com', 'Alice')
ON CONFLICT(id) DO UPDATE SET
email = excluded.email,
name = excluded.name;
SQL Server: MERGE
MERGE INTO users AS target
USING (SELECT 1 AS id, 'alice@example.com' AS email, 'Alice' AS name) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
email = source.email,
name = source.name
WHEN NOT MATCHED THEN
INSERT (id, email, name)
VALUES (source.id, source.email, source.name);
Практический пример с Python
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy.dialects.postgresql import insert
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True)
name = Column(String)
updated_at = Column(DateTime, default=datetime.utcnow)
engine = create_engine('postgresql://user:pass@localhost/db')
Base.metadata.create_all(engine)
session = Session(engine)
# PostgreSQL UPSERT с SQLAlchemy
stmt = insert(User).values(
id=1,
email='alice@example.com',
name='Alice',
updated_at=datetime.utcnow()
).on_conflict_do_update(
index_elements=['id'],
set_={
'email': 'alice@example.com',
'name': 'Alice',
'updated_at': datetime.utcnow()
}
)
session.execute(stmt)
session.commit()
# Или через ORM
from sqlalchemy import update
user = session.query(User).filter(User.id == 1).first()
if user:
user.email = 'alice@example.com'
user.name = 'Alice'
user.updated_at = datetime.utcnow()
else:
user = User(id=1, email='alice@example.com', name='Alice')
session.add(user)
session.commit()
Реальный пример: синхронизация API данных
import requests
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from typing import List, Dict
engine = create_engine('postgresql://...')
def sync_users_from_api():
"""
Синхронизировать пользователей из внешнего API.
Использует UPSERT для обновления/вставки.
"""
# Получить данные из API
response = requests.get('https://api.example.com/users')
api_users: List[Dict] = response.json()
with Session(engine) as session:
# PostgreSQL UPSERT
from sqlalchemy.dialects.postgresql import insert
for api_user in api_users:
stmt = insert(User).values(
external_id=api_user['id'],
email=api_user['email'],
name=api_user['name'],
api_sync_at=datetime.utcnow()
).on_conflict_do_update(
index_elements=['external_id'],
set_={
'email': api_user['email'],
'name': api_user['name'],
'api_sync_at': datetime.utcnow()
}
)
session.execute(stmt)
session.commit()
print(f"✓ Синхронизирован {len(api_users)} пользователей")
# Запустить синхронизацию
sync_users_from_api()
Проблемы и решения
1. Race conditions
-- ❌ ОПАСНО: несинхронизированные операции
SELECT id FROM users WHERE email = 'alice@example.com';
-- Если найдена: UPDATE
-- Если не найдена: INSERT
-- Между SELECT и INSERT может вставиться другой процесс!
-- ✓ БЕЗОПАСНО: UPSERT атомарен
INSERT INTO users (email, name) VALUES ('alice@example.com', 'Alice')
ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name;
-- Всё в одной операции, БД гарантирует atomicity
2. Много данных (bulk UPSERT)
# ❌ МЕДЛЕННО: один UPSERT за раз
for user in users:
stmt = insert(User).values(...).on_conflict_do_update(...)
session.execute(stmt)
session.commit()
# ✓ БЫСТРО: batch UPSERT
from sqlalchemy.dialects.postgresql import insert
stmt = insert(User).values([
{'id': 1, 'email': 'alice@example.com', 'name': 'Alice'},
{'id': 2, 'email': 'bob@example.com', 'name': 'Bob'},
{'id': 3, 'email': 'charlie@example.com', 'name': 'Charlie'},
]).on_conflict_do_update(
index_elements=['id'],
set_={
'email': insert(User).excluded.email,
'name': insert(User).excluded.name
}
)
session.execute(stmt)
session.commit()
3. Ограничение при обновлении
# ✓ PostgreSQL: обновлять только если условие выполнено
from sqlalchemy import and_
stmt = insert(User).values(...).on_conflict_do_update(
index_elements=['id'],
set_={'email': 'alice@example.com'},
where=and_(User.is_active == True, User.updated_at < datetime.utcnow())
)
Сравнение подходов
# ПОДХОД 1: SELECT + INSERT + UPDATE (НЕПРАВИЛЬНО)
def sync_user_bad(user_data):
user = session.query(User).filter(User.id == user_data['id']).first()
if user:
user.email = user_data['email']
user.name = user_data['name']
else:
user = User(**user_data)
session.add(user)
session.commit()
# ❌ Race condition: между SELECT и INSERT может вставиться другой процесс
# ПОДХОД 2: TRY INSERT + CATCH DUPLICATE (МЕДЛЕННО)
def sync_user_catch(user_data):
try:
user = User(**user_data)
session.add(user)
session.commit()
except IntegrityError:
session.rollback()
# Обновляем
session.query(User).filter(User.id == user_data['id']).update(user_data)
session.commit()
# Исключение дорогое, медленно
# ПОДХОД 3: UPSERT (ПРАВИЛЬНО и БЫСТРО)
def sync_user_good(user_data):
stmt = insert(User).values(**user_data).on_conflict_do_update(
index_elements=['id'],
set_={k: v for k, v in user_data.items() if k != 'id'}
)
session.execute(stmt)
session.commit()
# ✓ Атомарно, быстро, без race conditions
Примеры использования
1. Кэширование
# Сохранить или обновить результат кэша
from datetime import datetime, timedelta
def cache_api_response(key: str, data: dict, ttl_hours: int = 1):
stmt = insert(Cache).values(
key=key,
data=json.dumps(data),
expires_at=datetime.utcnow() + timedelta(hours=ttl_hours)
).on_conflict_do_update(
index_elements=['key'],
set_={
'data': json.dumps(data),
'expires_at': datetime.utcnow() + timedelta(hours=ttl_hours),
'updated_at': datetime.utcnow()
}
)
session.execute(stmt)
session.commit()
2. Синхронизация профилей
# Обновить профиль пользователя или создать если нет
def update_user_profile(user_id: int, profile_data: dict):
stmt = insert(UserProfile).values(
user_id=user_id,
**profile_data
).on_conflict_do_update(
index_elements=['user_id'],
set_=profile_data
)
session.execute(stmt)
session.commit()
3. Счётчики
# Инкрементировать или создать счётчик
from sqlalchemy import func
def increment_counter(key: str, value: int = 1):
stmt = insert(Counter).values(
key=key,
count=value
).on_conflict_do_update(
index_elements=['key'],
set_={'count': Counter.count + value}
)
session.execute(stmt)
session.commit()
Производительность
-- Время операции (на 1M записей)
SELECT + INSERT/UPDATE: ~2000ms (SELECT + логика + операция)
TRY INSERT + CATCH: ~1500ms (зависит от кол-ва конфликтов)
UPSERT (ON CONFLICT): ~500ms (одна атомарная операция)
UPSERT bulk (batch): ~50ms (для 10k записей)
-- UPSERT в 4-40 раз быстрее!
Ключевые моменты
- UPSERT атомарен (нет race conditions)
- Синтаксис разный в разных БД (ON CONFLICT, ON DUPLICATE KEY, MERGE)
- Эффективен для синхронизации данных
- Batch UPSERT очень быстро для много записей
- Используй индексы на конфликтующих столбцах
- Выбирай UPSERT вместо SELECT + INSERT/UPDATE
- PostgreSQL: ON CONFLICT самый гибкий
- MySQL: ON DUPLICATE KEY UPDATE простой и быстрый