← Назад к вопросам

Как из Телеграм бота обращаться к базе данных?

1.8 Middle🔥 181 комментариев
#Асинхронность и многопоточность#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Архитектура Telegram бота с БД

Телеграм бот должен иметь слоистую архитектуру: handlers (тонкие) → use cases (бизнес-логика) → repositories (доступ к БД). Это обеспечивает тестируемость и maintainability.

Подход с Dependency Injection

# app/database.py — слой доступа к БД
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from app.config import DATABASE_URL

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)

def get_db() -> Session:
    """Создаёт сессию БД"""
    session = SessionLocal()
    try:
        yield session
    finally:
        session.close()

# app/models.py — модели БД
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    telegram_id = Column(Integer, unique=True, index=True)
    username = Column(String, nullable=True)
    first_name = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)
    balance = Column(Integer, default=0)

class Order(Base):
    __tablename__ = 'orders'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    status = Column(String, default='pending')
    amount = Column(Integer)
    created_at = Column(DateTime, default=datetime.utcnow)

Repositories — доступ к данным

# app/repositories/user_repository.py
from sqlalchemy.orm import Session
from app.models import User
from typing import Optional

class UserRepository:
    def __init__(self, session: Session):
        self.session = session
    
    def get_by_telegram_id(self, telegram_id: int) -> Optional[User]:
        """Получить пользователя по Telegram ID"""
        return self.session.query(User).filter(
            User.telegram_id == telegram_id
        ).first()
    
    def create(self, telegram_id: int, first_name: str, username: str = None) -> User:
        """Создать нового пользователя"""
        user = User(
            telegram_id=telegram_id,
            first_name=first_name,
            username=username
        )
        self.session.add(user)
        self.session.commit()
        self.session.refresh(user)
        return user
    
    def get_or_create(self, telegram_id: int, first_name: str, username: str = None) -> User:
        """Получить или создать пользователя"""
        user = self.get_by_telegram_id(telegram_id)
        if user is None:
            user = self.create(telegram_id, first_name, username)
        return user
    
    def update_balance(self, user_id: int, amount: int):
        """Увеличить баланс"""
        user = self.session.query(User).filter(User.id == user_id).first()
        if user:
            user.balance += amount
            self.session.commit()

class OrderRepository:
    def __init__(self, session: Session):
        self.session = session
    
    def create_order(self, user_id: int, amount: int) -> Order:
        """Создать заказ"""
        order = Order(user_id=user_id, amount=amount, status='pending')
        self.session.add(order)
        self.session.commit()
        self.session.refresh(order)
        return order
    
    def get_pending_orders(self, user_id: int):
        """Получить незавершённые заказы"""
        return self.session.query(Order).filter(
            Order.user_id == user_id,
            Order.status == 'pending'
        ).all()

Use Cases — бизнес-логика

# app/use_cases/user_use_case.py
from app.repositories.user_repository import UserRepository
from sqlalchemy.orm import Session

class UserUseCase:
    def __init__(self, user_repo: UserRepository):
        self.user_repo = user_repo
    
    def register_or_get_user(self, telegram_id: int, first_name: str, username: str = None):
        """Зарегистрировать нового пользователя или получить существующего"""
        user = self.user_repo.get_or_create(telegram_id, first_name, username)
        return user
    
    def add_balance(self, user_id: int, amount: int):
        """Добавить баланс пользователю"""
        if amount <= 0:
            raise ValueError('Amount must be positive')
        self.user_repo.update_balance(user_id, amount)

class OrderUseCase:
    def __init__(self, order_repo, user_repo):
        self.order_repo = order_repo
        self.user_repo = user_repo
    
    def create_order(self, user_id: int, amount: int):
        """Создать заказ"""
        user = self.user_repo.session.query(User).filter(User.id == user_id).first()
        if not user:
            raise ValueError('User not found')
        if user.balance < amount:
            raise ValueError('Insufficient balance')
        
        order = self.order_repo.create_order(user_id, amount)
        self.user_repo.update_balance(user_id, -amount)  # Вычитаем из баланса
        return order

Handlers — тонкие обработчики (aiogram 3.x)

# app/handlers/user_handlers.py
from aiogram import Router, types
from aiogram.filters import CommandStart
from sqlalchemy.orm import Session
from app.use_cases.user_use_case import UserUseCase
from app.repositories.user_repository import UserRepository
from app.database import get_db

router = Router()

@router.message(CommandStart())
async def cmd_start(message: types.Message):
    """Обработчик команды /start"""
    # Получаем сессию БД
    session = next(get_db())
    
    try:
        # Создаём use case с внедрённой зависимостью
        user_repo = UserRepository(session)
        use_case = UserUseCase(user_repo)
        
        # Выполняем бизнес-логику
        user = use_case.register_or_get_user(
            telegram_id=message.from_user.id,
            first_name=message.from_user.first_name,
            username=message.from_user.username
        )
        
        # Отправляем ответ
        await message.answer(
            f'Привет, {user.first_name}! Твой баланс: {user.balance} руб.'
        )
    finally:
        session.close()

# app/handlers/order_handlers.py
from aiogram import Router, types
from aiogram.filters import CommandObject
from app.use_cases.order_use_case import OrderUseCase
from app.repositories.order_repository import OrderRepository

router = Router()

@router.message(Command('create_order'))
async def cmd_create_order(message: types.Message, command: CommandObject):
    """Команда: /create_order <amount>"""
    session = next(get_db())
    
    try:
        if not command.args:
            await message.answer('Укажите сумму: /create_order 100')
            return
        
        amount = int(command.args)
        
        # Get user
        user_repo = UserRepository(session)
        user = user_repo.get_by_telegram_id(message.from_user.id)
        
        if not user:
            await message.answer('Вы не зарегистрированы. Введите /start')
            return
        
        # Create order
        order_repo = OrderRepository(session)
        use_case = OrderUseCase(order_repo, user_repo)
        order = use_case.create_order(user.id, amount)
        
        await message.answer(f'Заказ #{order.id} создан на сумму {amount} руб.')
    except ValueError as e:
        await message.answer(f'Ошибка: {str(e)}')
    finally:
        session.close()

Dependency Injection Container (более чистый подход)

# app/di/container.py
from app.database import SessionLocal
from app.repositories.user_repository import UserRepository
from app.repositories.order_repository import OrderRepository
from app.use_cases.user_use_case import UserUseCase
from app.use_cases.order_use_case import OrderUseCase

class Container:
    def __init__(self):
        self.session = SessionLocal()
    
    @property
    def user_repository(self) -> UserRepository:
        return UserRepository(self.session)
    
    @property
    def order_repository(self) -> OrderRepository:
        return OrderRepository(self.session)
    
    @property
    def user_use_case(self) -> UserUseCase:
        return UserUseCase(self.user_repository)
    
    @property
    def order_use_case(self) -> OrderUseCase:
        return OrderUseCase(self.order_repository, self.user_repository)
    
    def close(self):
        self.session.close()

# app/handlers/user_handlers.py (с DI контейнером)
from app.di.container import Container

@router.message(CommandStart())
async def cmd_start(message: types.Message):
    container = Container()
    try:
        user = container.user_use_case.register_or_get_user(
            telegram_id=message.from_user.id,
            first_name=message.from_user.first_name,
            username=message.from_user.username
        )
        await message.answer(f'Привет, {user.first_name}!')
    finally:
        container.close()

Асинхронная работа с БД

# Для асинхронной работы используй AsyncSession (SQLAlchemy 2.0)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

async_engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
AsyncSessionLocal = sessionmaker(
    async_engine, class_=AsyncSession, expire_on_commit=False
)

async def get_async_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

# В handlers
@router.message(CommandStart())
async def cmd_start(message: types.Message):
    async with AsyncSessionLocal() as session:
        user = await session.get(User, message.from_user.id)
        await message.answer('OK')

Обработка ошибок БД

from sqlalchemy.exc import SQLAlchemyError, IntegrityError
import logging

logger = logging.getLogger(__name__)

@router.message(CommandStart())
async def cmd_start(message: types.Message):
    session = next(get_db())
    try:
        user_repo = UserRepository(session)
        user = user_repo.get_or_create(
            telegram_id=message.from_user.id,
            first_name=message.from_user.first_name
        )
        await message.answer('Вы зарегистрированы')
    except IntegrityError:
        logger.error('Database integrity error')
        await message.answer('Ошибка базы данных. Попробуйте позже.')
    except SQLAlchemyError as e:
        logger.error(f'Database error: {e}')
        await message.answer('Ошибка БД')
    finally:
        session.close()

Лучшие практики

  1. Repositories — все обращения к БД идут через них
  2. Use Cases — вся бизнес-логика
  3. Handlers — только получение данных и вызов use cases
  4. Dependency Injection — инъекция репозиториев в use cases
  5. Session management — всегда закрывай сессию в finally
  6. Error handling — логирование ошибок БД
  7. Transactions — используй транзакции для операций с несколькими таблицами
  8. Connection pooling — настрой пул соединений в SQLAlchemy

Этот подход делает код тестируемым, переиспользуемым и легко поддерживаемым.