← Назад к вопросам

Как реализуешь хранилище сообщений?

1.0 Junior🔥 71 комментариев
#Микросервисы и архитектура

Комментарии (1)

🐱
deepseek-v3.2PrepBro AI5 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Архитектура хранилища сообщений

Как Senior Go-разработчик, я подхожу к проектированию хранилища сообщений комплексно, учитывая требования к производительности, масштабируемости и отказоустойчивости. Моя реализация включает несколько слоёв абстракции и оптимизаций.

Основные архитектурные решения

Выбор базовой СУБД зависит от требований:

  • Для простых случаев подходит PostgreSQL с JSONB полями
  • Для high-load систем предпочитаю Cassandra или ScyllaDB из-за горизонтального масштабирования
  • Для кэширования быстрых операций - Redis с TTL

Схема данных в реляционной БД выглядит так:

CREATE TABLE messages (
    id BIGSERIAL PRIMARY KEY,
    conversation_id UUID NOT NULL,
    sender_id UUID NOT NULL,
    content TEXT NOT NULL,
    content_type VARCHAR(20) DEFAULT 'text',
    metadata JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    deleted_at TIMESTAMPTZ,
    INDEX idx_conversation_created (conversation_id, created_at DESC)
);

Реализация на Go

Структура проекта следует принципам чистой архитектуры:

internal/
├── domain/
│   └── message.go
├── repository/
│   └── message_repository.go
├── service/
│   └── message_service.go
└── delivery/
    └── http/
        └── message_handler.go

Доменный слой определяет основную бизнес-логику:

package domain

import (
    "time"
    "github.com/google/uuid"
)

type Message struct {
    ID             uuid.UUID  `json:"id"`
    ConversationID uuid.UUID  `json:"conversation_id"`
    SenderID       uuid.UUID  `json:"sender_id"`
    Content        string     `json:"content"`
    ContentType    string     `json:"content_type"`
    Metadata       []byte     `json:"metadata"`
    CreatedAt      time.Time  `json:"created_at"`
    UpdatedAt      time.Time  `json:"updated_at"`
    DeletedAt      *time.Time `json:"deleted_at,omitempty"`
}

type MessageRepository interface {
    Create(message *Message) error
    GetByID(id uuid.UUID) (*Message, error)
    GetByConversation(conversationID uuid.UUID, limit, offset int) ([]*Message, error)
    Update(message *Message) error
    SoftDelete(id uuid.UUID) error
    MarkAsDelivered(messageID, userID uuid.UUID) error
}

Реализация репозитория с использованием SQL:

package repository

import (
    "context"
    "database/sql"
    "fmt"
    "github.com/jmoiron/sqlx"
    "your-project/internal/domain"
)

type PostgresMessageRepository struct {
    db *sqlx.DB
}

func (r *PostgresMessageRepository) Create(ctx context.Context, msg *domain.Message) error {
    query := `INSERT INTO messages 
              (id, conversation_id, sender_id, content, content_type, metadata) 
              VALUES ($1, $2, $3, $4, $5, $6)`
    
    _, err := r.db.ExecContext(ctx, query,
        msg.ID,
        msg.ConversationID,
        msg.SenderID,
        msg.Content,
        msg.ContentType,
        msg.Metadata,
    )
    return err
}

func (r *PostgresMessageRepository) GetByConversation(
    ctx context.Context, 
    conversationID uuid.UUID, 
    limit, offset int,
) ([]*domain.Message, error) {
    var messages []*domain.Message
    
    query := `SELECT * FROM messages 
              WHERE conversation_id = $1 AND deleted_at IS NULL
              ORDER BY created_at DESC 
              LIMIT $2 OFFSET $3`
    
    err := r.db.SelectContext(ctx, &messages, query, conversationID, limit, offset)
    return messages, err
}

Критические оптимизации

Проблема N+1 запроса при загрузке сообщений решается:

// Пакетная загрузка вложений вместо отдельных запросов
func (r *PostgresMessageRepository) GetWithAttachments(
    ctx context.Context, 
    messageIDs []uuid.UUID,
) (map[uuid.UUID][]Attachment, error) {
    // Загрузка всех вложений одним запросом
}

Кэширование частозапрашиваемых данных:

type CachedMessageRepository struct {
    cache *redis.Client
    repo  domain.MessageRepository
}

func (r *CachedMessageRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.Message, error) {
    cacheKey := fmt.Sprintf("message:%s", id)
    
    // Пробуем получить из кэша
    if cached, err := r.cache.Get(ctx, cacheKey).Bytes(); err == nil {
        var msg domain.Message
        if json.Unmarshal(cached, &msg) == nil {
            return &msg, nil
        }
    }
    
    // Fallback к основной БД
    msg, err := r.repo.GetByID(ctx, id)
    if err != nil {
        return nil, err
    }
    
    // Сохраняем в кэш на 5 минут
    data, _ := json.Marshal(msg)
    r.cache.Set(ctx, cacheKey, data, 5*time.Minute)
    
    return msg, nil
}

Обработка высоких нагрузок

Для масштабирования применяю:

  • Шардирование по conversation_id
  • Асинхронную обработку через RabbitMQ/Kafka
  • Read Replicas для распределения запросов на чтение
  • Материализованные представления для агрегаций

Тестирование

func TestMessageRepository(t *testing.T) {
    repo := NewPostgresMessageRepository(testDB)
    
    t.Run("Create and retrieve message", func(t *testing.T) {
        msg := &domain.Message{
            ID:             uuid.New(),
            ConversationID: uuid.New(),
            SenderID:       uuid.New(),
            Content:        "Test message",
        }
        
        err := repo.Create(context.Background(), msg)
        require.NoError(t, err)
        
        retrieved, err := repo.GetByID(context.Background(), msg.ID)
        require.NoError(t, err)
        require.Equal(t, msg.Content, retrieved.Content)
    })
}

Мониторинг и метрики

Интегрирую Prometheus метрики для отслеживания:

  • Latency персистентных операций
  • Частоту обращений к кэшу
  • Ошибки при работе с БД

Такой подход обеспечивает масштабируемое, отказоустойчивое хранилище, способное обрабатывать от сотен до миллионов сообщений в секунду в зависимости от выбранной архитектуры и инстанцирования.