← Назад к вопросам

Как у telegram бота реализовать работу под большой нагрузкой?

3.0 Senior🔥 191 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Архитектура под нагрузку

Телеграм-боты требуют особого подхода к масштабированию из-за очередей обновлений и частых запросов. Вот стратегия для высоконагруженных систем.

1. Асинхронная обработка (asyncio)

Основной инструмент — асинхронные функции. Вместо блокирующих вызовов используй async/await:

import asyncio
from aiogram import Router, types
from aiogram.fsm.context import FSMContext

router = Router()

@router.message()
async def handle_message(message: types.Message):
    result = await process_expensive_operation(message.text)
    await message.answer(result)

async def process_expensive_operation(text: str) -> str:
    await asyncio.sleep(1)
    return f"Обработано: {text}"

2. Пулинг vs Webhook

Пулинг — бот сам спрашивает обновления каждые N секунд. Простая настройка, но задержка и нагрузка на API.

Webhook — Telegram отправляет обновления напрямую. Моментальная доставка, требует HTTPS.

from aiogram.types import Update
from fastapi import FastAPI

app = FastAPI()

@app.post("/webhook")
async def webhook(update: dict):
    telegram_update = Update(**update)
    await dp.feed_update(bot, telegram_update)
    return {"ok": True}

3. Очередь задач (Celery + Redis)

Для тяжелых операций вынеси их в фоновую очередь:

from celery import Celery
from redis import Redis

celery_app = Celery('bot', broker='redis://localhost:6379')

@celery_app.task
def process_user_request(user_id: int, query: str):
    result = expensive_calculation(query)
    return result

@router.message()
async def handle_message(message: types.Message):
    task = process_user_request.delay(message.from_user.id, message.text)
    await message.answer(f"Обработаю: {task.id}")

4. Кэширование (Redis)

Уменьши нагрузку на БД кэшированием:

from redis.asyncio import Redis
import json

redis = Redis(host='localhost', port=6379, decode_responses=True)

async def get_user_data(user_id: int) -> dict:
    cached = await redis.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    
    user_data = await db.get_user(user_id)
    await redis.setex(f"user:{user_id}", 300, json.dumps(user_data))
    return user_data

5. Конкурентность и лимиты

import asyncio

semaphore = asyncio.Semaphore(10)  # Max 10 параллельных

async def handle_with_limit(message: types.Message):
    async with semaphore:
        result = await process_message(message)
        await message.answer(result)

6. Rate Limiting

from datetime import datetime, timedelta
from collections import defaultdict

user_requests = defaultdict(list)

async def check_rate_limit(user_id: int, limit: int = 5, window: int = 60) -> bool:
    now = datetime.now()
    user_requests[user_id] = [
        ts for ts in user_requests[user_id]
        if now - ts < timedelta(seconds=window)
    ]
    
    if len(user_requests[user_id]) >= limit:
        return False
    
    user_requests[user_id].append(now)
    return True

@router.message()
async def message_handler(message: types.Message):
    if not await check_rate_limit(message.from_user.id):
        await message.answer("Слишком много запросов")
        return

7. Мониторинг

import logging
from prometheus_client import Counter, Histogram

logger = logging.getLogger(__name__)
message_counter = Counter('messages_processed', 'Total')
processing_time = Histogram('message_processing_seconds', 'Time')

@router.message()
async def message_handler(message: types.Message):
    start = time.time()
    try:
        result = await process_message(message)
        message_counter.inc()
    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)
    finally:
        processing_time.observe(time.time() - start)

8. Горизонтальное масштабирование

Используй Redis для синхронизации состояния между инстансами:

from aiogram.fsm.storage.redis import RedisStorage

storage = RedisStorage.from_url("redis://localhost:6379")
bot = Bot(token=TOKEN)
dp = Dispatcher(storage=storage)

Итог: Высоконагруженный бот требует async/await, webhook'ов, очередей задач, кэширования и rate limiting с мониторингом.

Как у telegram бота реализовать работу под большой нагрузкой? | PrepBro