← Назад к вопросам

Как синхронизировать Elasticsearch с PostgreSQL?

3.0 Senior🔥 101 комментариев
#Архитектура и паттерны#Базы данных (NoSQL)#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как синхронизировать Elasticsearch с PostgreSQL

Подход 1: Write-Through Pattern (обновление при каждом изменении)

Когда обновляем БД, сразу обновляем ES:

from elasticsearch import Elasticsearch
from sqlalchemy.orm import Session

es_client = Elasticsearch(["localhost:9200"])

def create_user(session: Session, name: str, email: str):
    # 1. Создаём пользователя в PostgreSQL
    user = User(name=name, email=email)
    session.add(user)
    session.commit()
    
    # 2. Индексируем в Elasticsearch
    es_client.index(
        index="users",
        id=user.id,
        document={
            "id": user.id,
            "name": user.name,
            "email": user.email
        }
    )
    
    return user

def update_user(session: Session, user_id: int, name: str):
    # 1. Обновляем БД
    user = session.query(User).get(user_id)
    user.name = name
    session.commit()
    
    # 2. Обновляем ES
    es_client.update(
        index="users",
        id=user_id,
        doc={"name": name}
    )

def delete_user(session: Session, user_id: int):
    # 1. Удаляем из БД
    session.query(User).filter(User.id == user_id).delete()
    session.commit()
    
    # 2. Удаляем из ES
    es_client.delete(index="users", id=user_id)

Минусы: если ES падает, потеряем синхронизацию.

Подход 2: Dual Writes (безопаснее)

Оба изменения оборачиваем в транзакцию:

def create_user_safe(session: Session, name: str):
    try:
        # 1. Создаём в БД
        user = User(name=name)
        session.add(user)
        session.commit()
        user_id = user.id
    except Exception as e:
        print(f"DB error: {e}")
        raise
    
    try:
        # 2. Индексируем в ES
        es_client.index(
            index="users",
            id=user_id,
            document={"id": user_id, "name": name}
        )
    except Exception as e:
        # ES fallen, но данные в БД есть
        # Нужна async обработка для восстановления
        print(f"ES error: {e}")
        schedule_retry(user_id)  # Retry позже
        raise

Подход 3: Change Data Capture (CDC) - самый надёжный

Используем логирование изменений PostgreSQL:

import psycopg2
from elasticsearch import Elasticsearch
from psycopg2.extras import LogicalReplicationConnection

class PostgresElasticsearchSync:
    def __init__(self, pg_conn_str, es_host):
        self.pg_conn = psycopg2.connect(pg_conn_str)
        self.es_client = Elasticsearch([es_host])
    
    def sync_from_wal(self):
        """Читаем Write-Ahead Log из PostgreSQL"""
        cur = self.pg_conn.cursor()
        cur.execute("""
            SELECT * FROM users WHERE updated_at > %s
        """, (datetime.utcnow() - timedelta(minutes=5),))
        
        for row in cur:
            self.es_client.index(
                index="users",
                id=row[0],
                document=self.row_to_dict(row)
            )

Подход 4: Message Queue (Kafka/RabbitMQ)

Используем очередь сообщений как промежуточный слой:

import json
from kafka import KafkaProducer, KafkaConsumer
from elasticsearch import Elasticsearch

# Producer (пишет в PostgreSQL и в Kafka)
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode()
)

def create_user_with_queue(session: Session, name: str):
    # 1. Создаём в БД
    user = User(name=name)
    session.add(user)
    session.commit()
    
    # 2. Отправляем событие в очередь
    producer.send("user-events", {
        "type": "user_created",
        "user_id": user.id,
        "name": user.name
    })

# Consumer (слушает очередь и обновляет ES)
consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers=["localhost:9092"],
    value_deserializer=lambda m: json.loads(m.decode())
)

es_client = Elasticsearch(["localhost:9200"])

def consume_and_sync():
    for message in consumer:
        event = message.value
        
        if event["type"] == "user_created":
            es_client.index(
                index="users",
                id=event["user_id"],
                document={"id": event["user_id"], "name": event["name"]}
            )
        elif event["type"] == "user_deleted":
            es_client.delete(index="users", id=event["user_id"])

Подход 5: Celery (для асинхронной синхронизации)

from celery import Celery
from elasticsearch import Elasticsearch

app = Celery()
es_client = Elasticsearch(["localhost:9200"])

# Отправляем задачу в Celery
@app.task
def sync_to_elasticsearch(user_id: int):
    """Синхронизировать пользователя в ES"""
    user = session.query(User).get(user_id)
    
    if user:
        es_client.index(
            index="users",
            id=user.id,
            document={
                "id": user.id,
                "name": user.name,
                "email": user.email
            }
        )
    else:
        es_client.delete(index="users", id=user_id)

# При создании пользователя
def create_user(session: Session, name: str):
    user = User(name=name)
    session.add(user)
    session.commit()
    
    # Асинхронно синхронизируем
    sync_to_elasticsearch.delay(user.id)

Полная синхронизация (переиндексация)

def full_sync_postgres_to_elasticsearch():
    """Переиндексировать всех пользователей"""
    users = session.query(User).all()
    
    # Используем bulk для производительности
    from elasticsearch.helpers import bulk
    
    actions = []
    for user in users:
        actions.append({
            "_index": "users",
            "_id": user.id,
            "_source": {
                "id": user.id,
                "name": user.name,
                "email": user.email
            }
        })
    
    bulk(es_client, actions)

Best Practices

  1. Используй CDC или Message Queue - более надёжно
  2. Bulk операции для производительности
  3. Версионирование в ES - отслеживай изменения
  4. Retry логика - ES может быть недоступен
  5. Обработка ошибок - не теряй данные
  6. Мониторинг - проверяй расхождения
  7. Periodic reconciliation - проверяй синхронизацию

Лучший способ: PostgreSQL + Kafka + Elasticsearch Consumer.

Как синхронизировать Elasticsearch с PostgreSQL? | PrepBro