← Назад к вопросам

Зачем нужен Celery Beat?

2.0 Middle🔥 111 комментариев
#Python Core#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Celery Beat — планировщик асинхронных задач

Celery Beat — это компонент фреймворка Celery, который отвечает за выполнение периодических и отложенных задач. Это как встроенный CRON для Python приложений, но более гибкий и интегрированный.

Основная задача Celery Beat

Celery Beat выполняет задачи по расписанию:

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

app = Celery('myapp')

# Настройка периодических задач
app.conf.beat_schedule = {
    # Выполняется каждые 30 секунд
    'check-status': {
        'task': 'tasks.check_system_status',
        'schedule': timedelta(seconds=30),
    },
    
    # Выполняется каждый день в 2:00 AM
    'daily-cleanup': {
        'task': 'tasks.cleanup_old_files',
        'schedule': crontab(hour=2, minute=0),
    },
    
    # Выполняется каждый час
    'hourly-report': {
        'task': 'tasks.generate_report',
        'schedule': timedelta(hours=1),
    },
}

Когда использовать Celery Beat

1. Периодическое выполнение задач

from celery import shared_task
import logging

logger = logging.getLogger(__name__)

@shared_task
def send_reminder_emails():
    """Отправляет напоминания пользователям каждый день"""
    users = User.objects.filter(has_pending_tasks=True)
    
    for user in users:
        send_email(
            user.email,
            subject="У вас есть предстоящие задачи",
            template="reminder.html"
        )
    
    logger.info(f"Отправлено напоминаний: {users.count()}")

# В beat_schedule:
app.conf.beat_schedule['send-reminders'] = {
    'task': 'myapp.tasks.send_reminder_emails',
    'schedule': crontab(hour=9, minute=0),  # Каждый день в 9:00
}

2. Очистка старых данных

@shared_task
def cleanup_expired_sessions():
    """Удаляет истекшие сессии из БД"""
    from django.utils import timezone
    from datetime import timedelta
    
    cutoff_date = timezone.now() - timedelta(days=30)
    
    deleted_count, _ = Session.objects.filter(
        expire_date__lt=cutoff_date
    ).delete()
    
    logger.info(f"Удалено сессий: {deleted_count}")

app.conf.beat_schedule['cleanup-sessions'] = {
    'task': 'myapp.tasks.cleanup_expired_sessions',
    'schedule': crontab(hour=3, minute=0),  # 3:00 AM каждый день
}

3. Синхронизация данных

@shared_task
def sync_with_external_api():
    """Синхронизирует данные с внешним API"""
    from external_api import get_latest_data
    
    try:
        data = get_latest_data()
        
        for item in data:
            Product.objects.update_or_create(
                external_id=item['id'],
                defaults={
                    'name': item['name'],
                    'price': item['price'],
                    'last_synced': timezone.now(),
                }
            )
        
        logger.info(f"Синхронизировано товаров: {len(data)}")
    
    except Exception as e:
        logger.error(f"Ошибка синхронизации: {e}")

app.conf.beat_schedule['sync-api'] = {
    'task': 'myapp.tasks.sync_with_external_api',
    'schedule': timedelta(minutes=30),  # Каждые 30 минут
}

4. Генерация отчетов

@shared_task
def generate_daily_report():
    """Генерирует ежедневный отчет и отправляет по email"""
    from datetime import datetime, timedelta
    
    yesterday = datetime.now() - timedelta(days=1)
    
    stats = {
        'users_registered': User.objects.filter(
            date_joined__date=yesterday.date()
        ).count(),
        'orders_placed': Order.objects.filter(
            created_at__date=yesterday.date()
        ).count(),
        'revenue': Order.objects.filter(
            created_at__date=yesterday.date()
        ).aggregate(Sum('total'))['total__sum'] or 0,
    }
    
    # Отправляем отчет
    send_email(
        'admin@company.com',
        subject=f"Дневной отчет {yesterday.date()}",
        context=stats,
        template='report.html'
    )
    
    logger.info(f"Отчет отправлен: {stats}")

app.conf.beat_schedule['daily-report'] = {
    'task': 'myapp.tasks.generate_daily_report',
    'schedule': crontab(hour=8, minute=0),  # 8:00 AM
}

Архитектура Celery Beat

┌─────────────────┐
│  Celery Beat    │  (Scheduler)
│  (процесс)      │  Хранит расписание
└────────┬────────┘  Проверяет время
         │           Отправляет задачи в очередь
         │
    ┌────▼────────────────────┐
    │  Message Broker (Redis) │
    │  (очередь задач)        │
    └────┬────────────────────┘
         │
    ┌────▼──────────────────────┐
    │  Celery Worker Process(es)│
    │  Выполняют задачи         │
    └───────────────────────────┘

Настройка Celery Beat

Вариант 1: В памяти (для разработки)

from celery import Celery
from celery.schedules import crontab

app = Celery('myapp')

app.conf.beat_schedule = {
    'task-name': {
        'task': 'module.function',
        'schedule': crontab(hour=0, minute=0),
    }
}

Вариант 2: С сохранением в БД (для production)

# pip install django-celery-beat

app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'

Тогда расписание хранится в БД и может изменяться без перезагрузки.

Примеры расписаний

from celery.schedules import crontab
from datetime import timedelta

app.conf.beat_schedule = {
    # Каждые 30 секунд
    'every-30-seconds': {
        'task': 'tasks.check_status',
        'schedule': timedelta(seconds=30),
    },
    
    # Каждый час
    'hourly': {
        'task': 'tasks.hourly_task',
        'schedule': timedelta(hours=1),
    },
    
    # Каждый день в 3:00 AM
    'daily-at-3am': {
        'task': 'tasks.daily_task',
        'schedule': crontab(hour=3, minute=0),
    },
    
    # Каждый понедельник в 9:00 AM
    'weekly-monday': {
        'task': 'tasks.weekly_task',
        'schedule': crontab(day_of_week=1, hour=9, minute=0),
    },
    
    # Каждый первый день месяца в полночь
    'monthly': {
        'task': 'tasks.monthly_task',
        'schedule': crontab(day_of_month=1, hour=0, minute=0),
    },
    
    # Каждые 5 минут
    'every-5-minutes': {
        'task': 'tasks.frequent_task',
        'schedule': timedelta(minutes=5),
    },
}

Запуск Celery Beat

# Запустить Beat (планировщик)
celery -A myapp beat

# В отдельном окне: запустить Worker (исполнитель)
celery -A myapp worker -l info

# Или вместе (не рекомендуется для production)
celery -A myapp worker --beat -l info

Альтернативы

APScheduler

Более легковесный вариант для простых кейсов:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

@scheduler.scheduled_job('cron', hour=17)
def scheduled_job():
    print('Это работает каждый день в 5 PM')

scheduler.start()

Система CRON

Для старых систем можно использовать crontab:

# В crontab (каждый час)
0 * * * * /usr/bin/python /path/to/script.py

Но это менее гибко и сложнее интегрировать с Python.

Преимущества Celery Beat

  • Интеграция с Celery — один инструмент для всех асинхронных задач
  • Надежность — можно сохранять расписание в БД
  • Масштабируемость — несколько workers могут выполнять одни задачи
  • Мониторинг — видны все выполненные задачи
  • Гибкость — поддерживает CRON синтаксис и интервалы
  • Обработка ошибок — retry логика, fallback handlers

Практический совет

Для production используй комбинацию:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# Расписание в БД
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# Таймзона
CELERY_TIMEZONE = 'Europe/Moscow'

Это даст вам полный контроль над расписанием без перезагрузки приложения.