← Назад к вопросам

Что такое оператор UPSERT?

2.0 Middle🔥 111 комментариев
#Python Core#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

UPSERT: UPDATE или INSERT в одной операции

UPSERT (Update or Insert) — это операция которая либо обновляет существующую запись, либо вставляет новую, если она не существует. Это очень полезно для синхронизации данных и избегания проблем с дубликатами.

Синтаксис UPSERT в разных БД

PostgreSQL: ON CONFLICT

-- Базовый синтаксис
INSERT INTO users (id, email, name) VALUES (1, 'alice@example.com', 'Alice')
ON CONFLICT (id) DO UPDATE SET
    email = EXCLUDED.email,
    name = EXCLUDED.name;

-- EXCLUDED указывает на значения которые пытались вставить

-- По PRIMARY KEY
INSERT INTO users (id, email) VALUES (1, 'alice@example.com')
ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email;

-- По UNIQUE индексу
INSERT INTO users (email, name) VALUES ('alice@example.com', 'Alice')
ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name;

-- Игнорировать конфликт (ничего не делать)
INSERT INTO users (id, email) VALUES (1, 'alice@example.com')
ON CONFLICT (id) DO NOTHING;

-- С условием
INSERT INTO users (id, email, updated_at) VALUES (1, 'alice@example.com', NOW())
ON CONFLICT (id) DO UPDATE SET 
    email = EXCLUDED.email,
    updated_at = NOW()
WHERE users.updated_at < NOW() - INTERVAL '1 day';

MySQL: ON DUPLICATE KEY UPDATE

-- Базовый синтаксис
INSERT INTO users (id, email, name) VALUES (1, 'alice@example.com', 'Alice')
ON DUPLICATE KEY UPDATE
    email = VALUES(email),
    name = VALUES(name);

-- Или используй VALUES() функцию
INSERT INTO users (id, email, name) VALUES (1, 'alice@example.com', 'Alice')
ON DUPLICATE KEY UPDATE
    email = VALUE(email),
    updated_at = NOW();

-- Несколько строк
INSERT INTO users (id, email, name) VALUES 
    (1, 'alice@example.com', 'Alice'),
    (2, 'bob@example.com', 'Bob')
ON DUPLICATE KEY UPDATE
    email = VALUES(email),
    name = VALUES(name);

SQLite: INSERT OR REPLACE

-- Заменить если существует
INSERT OR REPLACE INTO users (id, email, name)
VALUES (1, 'alice@example.com', 'Alice');

-- Игнорировать если существует
INSERT OR IGNORE INTO users (id, email, name)
VALUES (1, 'alice@example.com', 'Alice');

-- Обновить если существует
INSERT INTO users (id, email, name)
VALUES (1, 'alice@example.com', 'Alice')
ON CONFLICT(id) DO UPDATE SET 
    email = excluded.email,
    name = excluded.name;

SQL Server: MERGE

MERGE INTO users AS target
USING (SELECT 1 AS id, 'alice@example.com' AS email, 'Alice' AS name) AS source
ON target.id = source.id
WHEN MATCHED THEN
    UPDATE SET
        email = source.email,
        name = source.name
WHEN NOT MATCHED THEN
    INSERT (id, email, name)
    VALUES (source.id, source.email, source.name);

Практический пример с Python

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy.dialects.postgresql import insert
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True)
    name = Column(String)
    updated_at = Column(DateTime, default=datetime.utcnow)

engine = create_engine('postgresql://user:pass@localhost/db')
Base.metadata.create_all(engine)

session = Session(engine)

# PostgreSQL UPSERT с SQLAlchemy
stmt = insert(User).values(
    id=1,
    email='alice@example.com',
    name='Alice',
    updated_at=datetime.utcnow()
).on_conflict_do_update(
    index_elements=['id'],
    set_={
        'email': 'alice@example.com',
        'name': 'Alice',
        'updated_at': datetime.utcnow()
    }
)
session.execute(stmt)
session.commit()

# Или через ORM
from sqlalchemy import update

user = session.query(User).filter(User.id == 1).first()
if user:
    user.email = 'alice@example.com'
    user.name = 'Alice'
    user.updated_at = datetime.utcnow()
else:
    user = User(id=1, email='alice@example.com', name='Alice')
    session.add(user)
session.commit()

Реальный пример: синхронизация API данных

import requests
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from typing import List, Dict

engine = create_engine('postgresql://...')

def sync_users_from_api():
    """
    Синхронизировать пользователей из внешнего API.
    Использует UPSERT для обновления/вставки.
    """
    # Получить данные из API
    response = requests.get('https://api.example.com/users')
    api_users: List[Dict] = response.json()
    
    with Session(engine) as session:
        # PostgreSQL UPSERT
        from sqlalchemy.dialects.postgresql import insert
        
        for api_user in api_users:
            stmt = insert(User).values(
                external_id=api_user['id'],
                email=api_user['email'],
                name=api_user['name'],
                api_sync_at=datetime.utcnow()
            ).on_conflict_do_update(
                index_elements=['external_id'],
                set_={
                    'email': api_user['email'],
                    'name': api_user['name'],
                    'api_sync_at': datetime.utcnow()
                }
            )
            session.execute(stmt)
        
        session.commit()
        print(f"✓ Синхронизирован {len(api_users)} пользователей")

# Запустить синхронизацию
sync_users_from_api()

Проблемы и решения

1. Race conditions

-- ❌ ОПАСНО: несинхронизированные операции
SELECT id FROM users WHERE email = 'alice@example.com';
-- Если найдена: UPDATE
-- Если не найдена: INSERT
-- Между SELECT и INSERT может вставиться другой процесс!

-- ✓ БЕЗОПАСНО: UPSERT атомарен
INSERT INTO users (email, name) VALUES ('alice@example.com', 'Alice')
ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name;
-- Всё в одной операции, БД гарантирует atomicity

2. Много данных (bulk UPSERT)

# ❌ МЕДЛЕННО: один UPSERT за раз
for user in users:
    stmt = insert(User).values(...).on_conflict_do_update(...)
    session.execute(stmt)
session.commit()

# ✓ БЫСТРО: batch UPSERT
from sqlalchemy.dialects.postgresql import insert

stmt = insert(User).values([
    {'id': 1, 'email': 'alice@example.com', 'name': 'Alice'},
    {'id': 2, 'email': 'bob@example.com', 'name': 'Bob'},
    {'id': 3, 'email': 'charlie@example.com', 'name': 'Charlie'},
]).on_conflict_do_update(
    index_elements=['id'],
    set_={
        'email': insert(User).excluded.email,
        'name': insert(User).excluded.name
    }
)
session.execute(stmt)
session.commit()

3. Ограничение при обновлении

# ✓ PostgreSQL: обновлять только если условие выполнено
from sqlalchemy import and_

stmt = insert(User).values(...).on_conflict_do_update(
    index_elements=['id'],
    set_={'email': 'alice@example.com'},
    where=and_(User.is_active == True, User.updated_at < datetime.utcnow())
)

Сравнение подходов

# ПОДХОД 1: SELECT + INSERT + UPDATE (НЕПРАВИЛЬНО)
def sync_user_bad(user_data):
    user = session.query(User).filter(User.id == user_data['id']).first()
    if user:
        user.email = user_data['email']
        user.name = user_data['name']
    else:
        user = User(**user_data)
        session.add(user)
    session.commit()
    # ❌ Race condition: между SELECT и INSERT может вставиться другой процесс

# ПОДХОД 2: TRY INSERT + CATCH DUPLICATE (МЕДЛЕННО)
def sync_user_catch(user_data):
    try:
        user = User(**user_data)
        session.add(user)
        session.commit()
    except IntegrityError:
        session.rollback()
        # Обновляем
        session.query(User).filter(User.id == user_data['id']).update(user_data)
        session.commit()
    # Исключение дорогое, медленно

# ПОДХОД 3: UPSERT (ПРАВИЛЬНО и БЫСТРО)
def sync_user_good(user_data):
    stmt = insert(User).values(**user_data).on_conflict_do_update(
        index_elements=['id'],
        set_={k: v for k, v in user_data.items() if k != 'id'}
    )
    session.execute(stmt)
    session.commit()
    # ✓ Атомарно, быстро, без race conditions

Примеры использования

1. Кэширование

# Сохранить или обновить результат кэша
from datetime import datetime, timedelta

def cache_api_response(key: str, data: dict, ttl_hours: int = 1):
    stmt = insert(Cache).values(
        key=key,
        data=json.dumps(data),
        expires_at=datetime.utcnow() + timedelta(hours=ttl_hours)
    ).on_conflict_do_update(
        index_elements=['key'],
        set_={
            'data': json.dumps(data),
            'expires_at': datetime.utcnow() + timedelta(hours=ttl_hours),
            'updated_at': datetime.utcnow()
        }
    )
    session.execute(stmt)
    session.commit()

2. Синхронизация профилей

# Обновить профиль пользователя или создать если нет
def update_user_profile(user_id: int, profile_data: dict):
    stmt = insert(UserProfile).values(
        user_id=user_id,
        **profile_data
    ).on_conflict_do_update(
        index_elements=['user_id'],
        set_=profile_data
    )
    session.execute(stmt)
    session.commit()

3. Счётчики

# Инкрементировать или создать счётчик
from sqlalchemy import func

def increment_counter(key: str, value: int = 1):
    stmt = insert(Counter).values(
        key=key,
        count=value
    ).on_conflict_do_update(
        index_elements=['key'],
        set_={'count': Counter.count + value}
    )
    session.execute(stmt)
    session.commit()

Производительность

-- Время операции (на 1M записей)

SELECT + INSERT/UPDATE:     ~2000ms (SELECT + логика + операция)
TRY INSERT + CATCH:         ~1500ms (зависит от кол-ва конфликтов)
UPSERT (ON CONFLICT):       ~500ms  (одна атомарная операция)
UPSERT bulk (batch):        ~50ms   (для 10k записей)

-- UPSERT в 4-40 раз быстрее!

Ключевые моменты

  • UPSERT атомарен (нет race conditions)
  • Синтаксис разный в разных БД (ON CONFLICT, ON DUPLICATE KEY, MERGE)
  • Эффективен для синхронизации данных
  • Batch UPSERT очень быстро для много записей
  • Используй индексы на конфликтующих столбцах
  • Выбирай UPSERT вместо SELECT + INSERT/UPDATE
  • PostgreSQL: ON CONFLICT самый гибкий
  • MySQL: ON DUPLICATE KEY UPDATE простой и быстрый