← Назад к вопросам
Как из Телеграм бота обращаться к базе данных?
1.8 Middle🔥 181 комментариев
#Асинхронность и многопоточность#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Архитектура Telegram бота с БД
Телеграм бот должен иметь слоистую архитектуру: handlers (тонкие) → use cases (бизнес-логика) → repositories (доступ к БД). Это обеспечивает тестируемость и maintainability.
Подход с Dependency Injection
# app/database.py — слой доступа к БД
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from app.config import DATABASE_URL
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
def get_db() -> Session:
"""Создаёт сессию БД"""
session = SessionLocal()
try:
yield session
finally:
session.close()
# app/models.py — модели БД
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
telegram_id = Column(Integer, unique=True, index=True)
username = Column(String, nullable=True)
first_name = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
balance = Column(Integer, default=0)
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
status = Column(String, default='pending')
amount = Column(Integer)
created_at = Column(DateTime, default=datetime.utcnow)
Repositories — доступ к данным
# app/repositories/user_repository.py
from sqlalchemy.orm import Session
from app.models import User
from typing import Optional
class UserRepository:
def __init__(self, session: Session):
self.session = session
def get_by_telegram_id(self, telegram_id: int) -> Optional[User]:
"""Получить пользователя по Telegram ID"""
return self.session.query(User).filter(
User.telegram_id == telegram_id
).first()
def create(self, telegram_id: int, first_name: str, username: str = None) -> User:
"""Создать нового пользователя"""
user = User(
telegram_id=telegram_id,
first_name=first_name,
username=username
)
self.session.add(user)
self.session.commit()
self.session.refresh(user)
return user
def get_or_create(self, telegram_id: int, first_name: str, username: str = None) -> User:
"""Получить или создать пользователя"""
user = self.get_by_telegram_id(telegram_id)
if user is None:
user = self.create(telegram_id, first_name, username)
return user
def update_balance(self, user_id: int, amount: int):
"""Увеличить баланс"""
user = self.session.query(User).filter(User.id == user_id).first()
if user:
user.balance += amount
self.session.commit()
class OrderRepository:
def __init__(self, session: Session):
self.session = session
def create_order(self, user_id: int, amount: int) -> Order:
"""Создать заказ"""
order = Order(user_id=user_id, amount=amount, status='pending')
self.session.add(order)
self.session.commit()
self.session.refresh(order)
return order
def get_pending_orders(self, user_id: int):
"""Получить незавершённые заказы"""
return self.session.query(Order).filter(
Order.user_id == user_id,
Order.status == 'pending'
).all()
Use Cases — бизнес-логика
# app/use_cases/user_use_case.py
from app.repositories.user_repository import UserRepository
from sqlalchemy.orm import Session
class UserUseCase:
def __init__(self, user_repo: UserRepository):
self.user_repo = user_repo
def register_or_get_user(self, telegram_id: int, first_name: str, username: str = None):
"""Зарегистрировать нового пользователя или получить существующего"""
user = self.user_repo.get_or_create(telegram_id, first_name, username)
return user
def add_balance(self, user_id: int, amount: int):
"""Добавить баланс пользователю"""
if amount <= 0:
raise ValueError('Amount must be positive')
self.user_repo.update_balance(user_id, amount)
class OrderUseCase:
def __init__(self, order_repo, user_repo):
self.order_repo = order_repo
self.user_repo = user_repo
def create_order(self, user_id: int, amount: int):
"""Создать заказ"""
user = self.user_repo.session.query(User).filter(User.id == user_id).first()
if not user:
raise ValueError('User not found')
if user.balance < amount:
raise ValueError('Insufficient balance')
order = self.order_repo.create_order(user_id, amount)
self.user_repo.update_balance(user_id, -amount) # Вычитаем из баланса
return order
Handlers — тонкие обработчики (aiogram 3.x)
# app/handlers/user_handlers.py
from aiogram import Router, types
from aiogram.filters import CommandStart
from sqlalchemy.orm import Session
from app.use_cases.user_use_case import UserUseCase
from app.repositories.user_repository import UserRepository
from app.database import get_db
router = Router()
@router.message(CommandStart())
async def cmd_start(message: types.Message):
"""Обработчик команды /start"""
# Получаем сессию БД
session = next(get_db())
try:
# Создаём use case с внедрённой зависимостью
user_repo = UserRepository(session)
use_case = UserUseCase(user_repo)
# Выполняем бизнес-логику
user = use_case.register_or_get_user(
telegram_id=message.from_user.id,
first_name=message.from_user.first_name,
username=message.from_user.username
)
# Отправляем ответ
await message.answer(
f'Привет, {user.first_name}! Твой баланс: {user.balance} руб.'
)
finally:
session.close()
# app/handlers/order_handlers.py
from aiogram import Router, types
from aiogram.filters import CommandObject
from app.use_cases.order_use_case import OrderUseCase
from app.repositories.order_repository import OrderRepository
router = Router()
@router.message(Command('create_order'))
async def cmd_create_order(message: types.Message, command: CommandObject):
"""Команда: /create_order <amount>"""
session = next(get_db())
try:
if not command.args:
await message.answer('Укажите сумму: /create_order 100')
return
amount = int(command.args)
# Get user
user_repo = UserRepository(session)
user = user_repo.get_by_telegram_id(message.from_user.id)
if not user:
await message.answer('Вы не зарегистрированы. Введите /start')
return
# Create order
order_repo = OrderRepository(session)
use_case = OrderUseCase(order_repo, user_repo)
order = use_case.create_order(user.id, amount)
await message.answer(f'Заказ #{order.id} создан на сумму {amount} руб.')
except ValueError as e:
await message.answer(f'Ошибка: {str(e)}')
finally:
session.close()
Dependency Injection Container (более чистый подход)
# app/di/container.py
from app.database import SessionLocal
from app.repositories.user_repository import UserRepository
from app.repositories.order_repository import OrderRepository
from app.use_cases.user_use_case import UserUseCase
from app.use_cases.order_use_case import OrderUseCase
class Container:
def __init__(self):
self.session = SessionLocal()
@property
def user_repository(self) -> UserRepository:
return UserRepository(self.session)
@property
def order_repository(self) -> OrderRepository:
return OrderRepository(self.session)
@property
def user_use_case(self) -> UserUseCase:
return UserUseCase(self.user_repository)
@property
def order_use_case(self) -> OrderUseCase:
return OrderUseCase(self.order_repository, self.user_repository)
def close(self):
self.session.close()
# app/handlers/user_handlers.py (с DI контейнером)
from app.di.container import Container
@router.message(CommandStart())
async def cmd_start(message: types.Message):
container = Container()
try:
user = container.user_use_case.register_or_get_user(
telegram_id=message.from_user.id,
first_name=message.from_user.first_name,
username=message.from_user.username
)
await message.answer(f'Привет, {user.first_name}!')
finally:
container.close()
Асинхронная работа с БД
# Для асинхронной работы используй AsyncSession (SQLAlchemy 2.0)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
async_engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
AsyncSessionLocal = sessionmaker(
async_engine, class_=AsyncSession, expire_on_commit=False
)
async def get_async_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
# В handlers
@router.message(CommandStart())
async def cmd_start(message: types.Message):
async with AsyncSessionLocal() as session:
user = await session.get(User, message.from_user.id)
await message.answer('OK')
Обработка ошибок БД
from sqlalchemy.exc import SQLAlchemyError, IntegrityError
import logging
logger = logging.getLogger(__name__)
@router.message(CommandStart())
async def cmd_start(message: types.Message):
session = next(get_db())
try:
user_repo = UserRepository(session)
user = user_repo.get_or_create(
telegram_id=message.from_user.id,
first_name=message.from_user.first_name
)
await message.answer('Вы зарегистрированы')
except IntegrityError:
logger.error('Database integrity error')
await message.answer('Ошибка базы данных. Попробуйте позже.')
except SQLAlchemyError as e:
logger.error(f'Database error: {e}')
await message.answer('Ошибка БД')
finally:
session.close()
Лучшие практики
- Repositories — все обращения к БД идут через них
- Use Cases — вся бизнес-логика
- Handlers — только получение данных и вызов use cases
- Dependency Injection — инъекция репозиториев в use cases
- Session management — всегда закрывай сессию в finally
- Error handling — логирование ошибок БД
- Transactions — используй транзакции для операций с несколькими таблицами
- Connection pooling — настрой пул соединений в SQLAlchemy
Этот подход делает код тестируемым, переиспользуемым и легко поддерживаемым.