← Назад к вопросам
Зачем нужен Celery Beat?
2.0 Middle🔥 111 комментариев
#Python Core#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Celery Beat — планировщик асинхронных задач
Celery Beat — это компонент фреймворка Celery, который отвечает за выполнение периодических и отложенных задач. Это как встроенный CRON для Python приложений, но более гибкий и интегрированный.
Основная задача Celery Beat
Celery Beat выполняет задачи по расписанию:
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
app = Celery('myapp')
# Настройка периодических задач
app.conf.beat_schedule = {
# Выполняется каждые 30 секунд
'check-status': {
'task': 'tasks.check_system_status',
'schedule': timedelta(seconds=30),
},
# Выполняется каждый день в 2:00 AM
'daily-cleanup': {
'task': 'tasks.cleanup_old_files',
'schedule': crontab(hour=2, minute=0),
},
# Выполняется каждый час
'hourly-report': {
'task': 'tasks.generate_report',
'schedule': timedelta(hours=1),
},
}
Когда использовать Celery Beat
1. Периодическое выполнение задач
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task
def send_reminder_emails():
"""Отправляет напоминания пользователям каждый день"""
users = User.objects.filter(has_pending_tasks=True)
for user in users:
send_email(
user.email,
subject="У вас есть предстоящие задачи",
template="reminder.html"
)
logger.info(f"Отправлено напоминаний: {users.count()}")
# В beat_schedule:
app.conf.beat_schedule['send-reminders'] = {
'task': 'myapp.tasks.send_reminder_emails',
'schedule': crontab(hour=9, minute=0), # Каждый день в 9:00
}
2. Очистка старых данных
@shared_task
def cleanup_expired_sessions():
"""Удаляет истекшие сессии из БД"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=30)
deleted_count, _ = Session.objects.filter(
expire_date__lt=cutoff_date
).delete()
logger.info(f"Удалено сессий: {deleted_count}")
app.conf.beat_schedule['cleanup-sessions'] = {
'task': 'myapp.tasks.cleanup_expired_sessions',
'schedule': crontab(hour=3, minute=0), # 3:00 AM каждый день
}
3. Синхронизация данных
@shared_task
def sync_with_external_api():
"""Синхронизирует данные с внешним API"""
from external_api import get_latest_data
try:
data = get_latest_data()
for item in data:
Product.objects.update_or_create(
external_id=item['id'],
defaults={
'name': item['name'],
'price': item['price'],
'last_synced': timezone.now(),
}
)
logger.info(f"Синхронизировано товаров: {len(data)}")
except Exception as e:
logger.error(f"Ошибка синхронизации: {e}")
app.conf.beat_schedule['sync-api'] = {
'task': 'myapp.tasks.sync_with_external_api',
'schedule': timedelta(minutes=30), # Каждые 30 минут
}
4. Генерация отчетов
@shared_task
def generate_daily_report():
"""Генерирует ежедневный отчет и отправляет по email"""
from datetime import datetime, timedelta
yesterday = datetime.now() - timedelta(days=1)
stats = {
'users_registered': User.objects.filter(
date_joined__date=yesterday.date()
).count(),
'orders_placed': Order.objects.filter(
created_at__date=yesterday.date()
).count(),
'revenue': Order.objects.filter(
created_at__date=yesterday.date()
).aggregate(Sum('total'))['total__sum'] or 0,
}
# Отправляем отчет
send_email(
'admin@company.com',
subject=f"Дневной отчет {yesterday.date()}",
context=stats,
template='report.html'
)
logger.info(f"Отчет отправлен: {stats}")
app.conf.beat_schedule['daily-report'] = {
'task': 'myapp.tasks.generate_daily_report',
'schedule': crontab(hour=8, minute=0), # 8:00 AM
}
Архитектура Celery Beat
┌─────────────────┐
│ Celery Beat │ (Scheduler)
│ (процесс) │ Хранит расписание
└────────┬────────┘ Проверяет время
│ Отправляет задачи в очередь
│
┌────▼────────────────────┐
│ Message Broker (Redis) │
│ (очередь задач) │
└────┬────────────────────┘
│
┌────▼──────────────────────┐
│ Celery Worker Process(es)│
│ Выполняют задачи │
└───────────────────────────┘
Настройка Celery Beat
Вариант 1: В памяти (для разработки)
from celery import Celery
from celery.schedules import crontab
app = Celery('myapp')
app.conf.beat_schedule = {
'task-name': {
'task': 'module.function',
'schedule': crontab(hour=0, minute=0),
}
}
Вариант 2: С сохранением в БД (для production)
# pip install django-celery-beat
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
Тогда расписание хранится в БД и может изменяться без перезагрузки.
Примеры расписаний
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
# Каждые 30 секунд
'every-30-seconds': {
'task': 'tasks.check_status',
'schedule': timedelta(seconds=30),
},
# Каждый час
'hourly': {
'task': 'tasks.hourly_task',
'schedule': timedelta(hours=1),
},
# Каждый день в 3:00 AM
'daily-at-3am': {
'task': 'tasks.daily_task',
'schedule': crontab(hour=3, minute=0),
},
# Каждый понедельник в 9:00 AM
'weekly-monday': {
'task': 'tasks.weekly_task',
'schedule': crontab(day_of_week=1, hour=9, minute=0),
},
# Каждый первый день месяца в полночь
'monthly': {
'task': 'tasks.monthly_task',
'schedule': crontab(day_of_month=1, hour=0, minute=0),
},
# Каждые 5 минут
'every-5-minutes': {
'task': 'tasks.frequent_task',
'schedule': timedelta(minutes=5),
},
}
Запуск Celery Beat
# Запустить Beat (планировщик)
celery -A myapp beat
# В отдельном окне: запустить Worker (исполнитель)
celery -A myapp worker -l info
# Или вместе (не рекомендуется для production)
celery -A myapp worker --beat -l info
Альтернативы
APScheduler
Более легковесный вариант для простых кейсов:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
@scheduler.scheduled_job('cron', hour=17)
def scheduled_job():
print('Это работает каждый день в 5 PM')
scheduler.start()
Система CRON
Для старых систем можно использовать crontab:
# В crontab (каждый час)
0 * * * * /usr/bin/python /path/to/script.py
Но это менее гибко и сложнее интегрировать с Python.
Преимущества Celery Beat
- Интеграция с Celery — один инструмент для всех асинхронных задач
- Надежность — можно сохранять расписание в БД
- Масштабируемость — несколько workers могут выполнять одни задачи
- Мониторинг — видны все выполненные задачи
- Гибкость — поддерживает CRON синтаксис и интервалы
- Обработка ошибок — retry логика, fallback handlers
Практический совет
Для production используй комбинацию:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# Расписание в БД
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# Таймзона
CELERY_TIMEZONE = 'Europe/Moscow'
Это даст вам полный контроль над расписанием без перезагрузки приложения.