Как реализуешь хранилище сообщений?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Архитектура хранилища сообщений
Как Senior Go-разработчик, я подхожу к проектированию хранилища сообщений комплексно, учитывая требования к производительности, масштабируемости и отказоустойчивости. Моя реализация включает несколько слоёв абстракции и оптимизаций.
Основные архитектурные решения
Выбор базовой СУБД зависит от требований:
- Для простых случаев подходит PostgreSQL с JSONB полями
- Для high-load систем предпочитаю Cassandra или ScyllaDB из-за горизонтального масштабирования
- Для кэширования быстрых операций - Redis с TTL
Схема данных в реляционной БД выглядит так:
CREATE TABLE messages (
id BIGSERIAL PRIMARY KEY,
conversation_id UUID NOT NULL,
sender_id UUID NOT NULL,
content TEXT NOT NULL,
content_type VARCHAR(20) DEFAULT 'text',
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
deleted_at TIMESTAMPTZ,
INDEX idx_conversation_created (conversation_id, created_at DESC)
);
Реализация на Go
Структура проекта следует принципам чистой архитектуры:
internal/
├── domain/
│ └── message.go
├── repository/
│ └── message_repository.go
├── service/
│ └── message_service.go
└── delivery/
└── http/
└── message_handler.go
Доменный слой определяет основную бизнес-логику:
package domain
import (
"time"
"github.com/google/uuid"
)
type Message struct {
ID uuid.UUID `json:"id"`
ConversationID uuid.UUID `json:"conversation_id"`
SenderID uuid.UUID `json:"sender_id"`
Content string `json:"content"`
ContentType string `json:"content_type"`
Metadata []byte `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}
type MessageRepository interface {
Create(message *Message) error
GetByID(id uuid.UUID) (*Message, error)
GetByConversation(conversationID uuid.UUID, limit, offset int) ([]*Message, error)
Update(message *Message) error
SoftDelete(id uuid.UUID) error
MarkAsDelivered(messageID, userID uuid.UUID) error
}
Реализация репозитория с использованием SQL:
package repository
import (
"context"
"database/sql"
"fmt"
"github.com/jmoiron/sqlx"
"your-project/internal/domain"
)
type PostgresMessageRepository struct {
db *sqlx.DB
}
func (r *PostgresMessageRepository) Create(ctx context.Context, msg *domain.Message) error {
query := `INSERT INTO messages
(id, conversation_id, sender_id, content, content_type, metadata)
VALUES ($1, $2, $3, $4, $5, $6)`
_, err := r.db.ExecContext(ctx, query,
msg.ID,
msg.ConversationID,
msg.SenderID,
msg.Content,
msg.ContentType,
msg.Metadata,
)
return err
}
func (r *PostgresMessageRepository) GetByConversation(
ctx context.Context,
conversationID uuid.UUID,
limit, offset int,
) ([]*domain.Message, error) {
var messages []*domain.Message
query := `SELECT * FROM messages
WHERE conversation_id = $1 AND deleted_at IS NULL
ORDER BY created_at DESC
LIMIT $2 OFFSET $3`
err := r.db.SelectContext(ctx, &messages, query, conversationID, limit, offset)
return messages, err
}
Критические оптимизации
Проблема N+1 запроса при загрузке сообщений решается:
// Пакетная загрузка вложений вместо отдельных запросов
func (r *PostgresMessageRepository) GetWithAttachments(
ctx context.Context,
messageIDs []uuid.UUID,
) (map[uuid.UUID][]Attachment, error) {
// Загрузка всех вложений одним запросом
}
Кэширование частозапрашиваемых данных:
type CachedMessageRepository struct {
cache *redis.Client
repo domain.MessageRepository
}
func (r *CachedMessageRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.Message, error) {
cacheKey := fmt.Sprintf("message:%s", id)
// Пробуем получить из кэша
if cached, err := r.cache.Get(ctx, cacheKey).Bytes(); err == nil {
var msg domain.Message
if json.Unmarshal(cached, &msg) == nil {
return &msg, nil
}
}
// Fallback к основной БД
msg, err := r.repo.GetByID(ctx, id)
if err != nil {
return nil, err
}
// Сохраняем в кэш на 5 минут
data, _ := json.Marshal(msg)
r.cache.Set(ctx, cacheKey, data, 5*time.Minute)
return msg, nil
}
Обработка высоких нагрузок
Для масштабирования применяю:
- Шардирование по conversation_id
- Асинхронную обработку через RabbitMQ/Kafka
- Read Replicas для распределения запросов на чтение
- Материализованные представления для агрегаций
Тестирование
func TestMessageRepository(t *testing.T) {
repo := NewPostgresMessageRepository(testDB)
t.Run("Create and retrieve message", func(t *testing.T) {
msg := &domain.Message{
ID: uuid.New(),
ConversationID: uuid.New(),
SenderID: uuid.New(),
Content: "Test message",
}
err := repo.Create(context.Background(), msg)
require.NoError(t, err)
retrieved, err := repo.GetByID(context.Background(), msg.ID)
require.NoError(t, err)
require.Equal(t, msg.Content, retrieved.Content)
})
}
Мониторинг и метрики
Интегрирую Prometheus метрики для отслеживания:
- Latency персистентных операций
- Частоту обращений к кэшу
- Ошибки при работе с БД
Такой подход обеспечивает масштабируемое, отказоустойчивое хранилище, способное обрабатывать от сотен до миллионов сообщений в секунду в зависимости от выбранной архитектуры и инстанцирования.