Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как работает пагинация в SQL
Пагинация — это разделение большого набора результатов на страницы. Это критично для производительности и UX, особенно когда в таблице миллионы строк.
1. Основной метод: LIMIT и OFFSET
Базовый синтаксис
-- Получить 10 результатов со смещением 20
SELECT id, name, email FROM users
ORDER BY created_at DESC
LIMIT 10 OFFSET 20;
- LIMIT 10 — количество строк на странице
- OFFSET 20 — сколько строк пропустить (для страницы 3: 3 * 10 - 10 = 20)
В Python с SQLAlchemy
from sqlalchemy import select
from sqlalchemy.orm import AsyncSession
async def get_users_paginated(
session: AsyncSession,
page: int = 1,
page_size: int = 10
) -> tuple[list, int]:
"""
Получить пользователей с пагинацией.
Returns: (users, total_count)
"""
# Считаем общее количество (без LIMIT/OFFSET)
total_query = select(func.count()).select_from(User)
total = (await session.execute(total_query)).scalar()
# Основной запрос с пагинацией
offset = (page - 1) * page_size
query = (
select(User)
.order_by(User.created_at.desc())
.limit(page_size)
.offset(offset)
)
users = (await session.execute(query)).scalars().all()
return users, total
В API endpoint
from fastapi import APIRouter, Query
from pydantic import BaseModel
router = APIRouter()
class PaginationResponse(BaseModel):
items: list
total: int
page: int
page_size: int
total_pages: int
@router.get("/api/v1/users", response_model=PaginationResponse)
async def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
session: AsyncSession = Depends(get_session)
):
users, total = await get_users_paginated(session, page, page_size)
total_pages = (total + page_size - 1) // page_size
return PaginationResponse(
items=users,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
2. Проблема OFFSET для больших данных
Проблема
-- Это медленно для больших страниц!
SELECT * FROM users WHERE deleted_at IS NULL
ORDER BY id
LIMIT 10 OFFSET 10000000; -- Базе придётся прочитать 10 млн строк
OFFSET нужно пропустить, а это медленно!
Решение: Keyset pagination (Cursor-based)
Вместо смещения используем последний ID/ключ:
-- Получить следующие 10 записей после ID 5000
SELECT * FROM users
WHERE id > 5000 -- Начинаем ОТ последнего ID
ORDER BY id
LIMIT 10;
В Python с SQLAlchemy
from typing import Optional
async def get_users_cursor(
session: AsyncSession,
cursor: Optional[int] = None,
page_size: int = 10
) -> dict:
"""
Keyset/Cursor pagination.
cursor — ID последней записи предыдущей страницы
"""
query = select(User).order_by(User.id)
# Если cursor передан, начинаем после него
if cursor:
query = query.where(User.id > cursor)
# Получаем page_size + 1 для определения наличия следующей страницы
query = query.limit(page_size + 1)
results = (await session.execute(query)).scalars().all()
# Проверяем, есть ли ещё результаты
has_more = len(results) > page_size
if has_more:
results = results[:page_size]
return {
"items": results,
"next_cursor": results[-1].id if results and has_more else None,
"has_more": has_more
}
@router.get("/api/v1/users/cursor")
async def list_users_cursor(
cursor: Optional[int] = Query(None),
page_size: int = Query(10, ge=1, le=100),
session: AsyncSession = Depends(get_session)
):
return await get_users_cursor(session, cursor, page_size)
3. Сложная пагинация по нескольким полям
Keyset с сортировкой по дате и ID
-- Сортируем по дате, потом по ID для уникальности
SELECT * FROM posts
WHERE (created_at, id) > (:last_date, :last_id)
ORDER BY created_at DESC, id DESC
LIMIT 20;
В Python
from datetime import datetime
from typing import Optional, Tuple
async def get_posts_advanced(
session: AsyncSession,
cursor_date: Optional[datetime] = None,
cursor_id: Optional[int] = None,
page_size: int = 20
) -> dict:
query = select(Post).order_by(Post.created_at.desc(), Post.id.desc())
# Фильтруем по курсору (tuple comparison)
if cursor_date and cursor_id:
query = query.where(
(Post.created_at < cursor_date) |
((Post.created_at == cursor_date) & (Post.id < cursor_id))
)
results = (await session.execute(query.limit(page_size + 1))).scalars().all()
has_more = len(results) > page_size
if has_more:
results = results[:page_size]
next_cursor = None
if results and has_more:
last_post = results[-1]
next_cursor = {
"created_at": last_post.created_at.isoformat(),
"id": last_post.id
}
return {
"items": results,
"next_cursor": next_cursor,
"has_more": has_more
}
4. Оптимизация: Индексы
Обязательные индексы для пагинации
-- Миграция Goose
-- +goose Up
-- Индекс для сортировки по дате
CREATE INDEX idx_users_created_at ON users(created_at DESC);
-- Составной индекс для cursor-based пагинации
CREATE INDEX idx_users_created_at_id ON users(created_at DESC, id DESC);
-- Если есть фильтр (deleted_at IS NULL)
CREATE INDEX idx_users_deleted_active ON users(deleted_at, created_at DESC)
WHERE deleted_at IS NULL;
-- +goose Down
DROP INDEX idx_users_created_at;
DROP INDEX idx_users_created_at_id;
DROP INDEX idx_users_deleted_active;
5. Сравнение методов
| Метод | Плюсы | Минусы | Использование |
|---|---|---|---|
| OFFSET | Просто, можно перейти на любую страницу | Медленно на больших смещениях | Маленькие наборы данных (<100k) |
| Cursor | Быстро на больших данных, не требует OFFSET | Нельзя перейти на точную страницу, сложнее | Production, big data, infinite scroll |
| Seek | Очень быстро для range запросов | Требует WHERE условие | Специфические фильтры |
6. Практический пример API
from fastapi import APIRouter
from typing import Optional
router = APIRouter()
@router.get("/api/v1/posts")
async def list_posts(
# Cursor-based пагинация
cursor: Optional[str] = Query(None, description="Base64(created_at:id)"),
limit: int = Query(20, ge=1, le=100),
session: AsyncSession = Depends(get_session)
):
query = select(Post).order_by(Post.created_at.desc(), Post.id.desc())
# Декодируем cursor
if cursor:
import base64
data = base64.b64decode(cursor).decode().split(":")
created_at = datetime.fromisoformat(data[0])
post_id = int(data[1])
query = query.where(
(Post.created_at < created_at) |
((Post.created_at == created_at) & (Post.id < post_id))
)
# Получаем limit + 1 для has_next
posts = (await session.execute(query.limit(limit + 1))).scalars().all()
has_next = len(posts) > limit
if has_next:
posts = posts[:limit]
# Кодируем следующий cursor
next_cursor = None
if posts and has_next:
last = posts[-1]
import base64
cursor_str = f"{last.created_at.isoformat()}:{last.id}"
next_cursor = base64.b64encode(cursor_str.encode()).decode()
return {
"data": posts,
"pagination": {
"next_cursor": next_cursor,
"has_next": has_next
}
}
Лучшие практики
- Используй OFFSET для маленьких наборов данных
- Переходи на Cursor когда таблица > 100k строк
- Всегда индексируй поля сортировки
- Ограничивай page_size максимум 100
- Используй EXPLAIN для проверки индексов
- Кэшируй count(*) если часто меняется