← Назад к вопросам
Какие знаешь инструменты запуска программ по расписанию в Python?
1.7 Middle🔥 171 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Инструменты запуска программ по расписанию в Python
Запуск программ по расписанию необходим для автоматизации задач: отправка писем, резервные копии, очистка кэша и т.п.
1. schedule — простая библиотека
Самый простой способ для базовых задач:
import schedule
import time
def send_email_report():
print("Отправляем отчёт по почте")
# Запустить каждый день в 10:30
schedule.every().day.at("10:30").do(send_email_report)
# Запустить каждый час
schedule.every().hour.do(send_email_report)
# Запустить каждые 10 минут
schedule.every(10).minutes.do(send_email_report)
# Запустить каждый понедельник в 9:00
schedule.every().monday.at("09:00").do(send_email_report)
# Бесконечный цикл проверки
while True:
schedule.run_pending()
time.sleep(60)
Использование с аргументами
import schedule
def process_batch(batch_id, priority="normal"):
print(f"Обработка batch {batch_id} с приоритетом {priority}")
# Передать аргументы
schedule.every().day.at("10:00").do(process_batch, batch_id=1, priority="high")
# Проверить следующий запуск
print(schedule.next_run())
# Получить количество отложенных задач
print(schedule.idle_seconds())
Недостатки schedule
- Нет сохранения состояния (если упадёт скрипт, всё теряется)
- Не подходит для распределённых систем
- Сложно масштабировать
2. APScheduler — мощная библиотека
Для более сложных сценариев:
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
def task_function(name):
print(f"Выполнение задачи для {name} в {datetime.now()}")
# Создать scheduler в фоне
scheduler = BackgroundScheduler()
# Добавить задачу на определённое время
scheduler.add_job(
func=task_function,
trigger="cron",
hour=14,
minute=30,
args=["Иван"],
id="my_job"
)
# Добавить задачу каждый час
scheduler.add_job(
func=task_function,
trigger="interval",
hours=1,
args=["Auto"]
)
# Запустить scheduler
scheduler.start()
try:
# Программа продолжает работу
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
Триггеры APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# date триггер — одноразовый запуск
from datetime import datetime
scheduler.add_job(
func=backup_database,
trigger="date",
run_date=datetime(2024, 12, 25, 12, 0),
id="christmas_backup"
)
# interval триггер — каждый промежуток времени
scheduler.add_job(
func=check_status,
trigger="interval",
seconds=30
)
# cron триггер — сложное расписание
scheduler.add_job(
func=daily_report,
trigger="cron",
day_of_week="mon-fri", # Пн-Пт
hour=9,
minute=0,
id="weekday_report"
)
Cron синтаксис
# Поле | Допустимые значения
# -------- | ------------------
# year | 4-значное число
# month | 1-12
# day | 1-31
# week | 1-53 (неделя года)
# day_of_week | 0-6 (0=пн, 6=вс) или mon,tue,wed,thu,fri,sat,sun
# hour | 0-23
# minute | 0-59
# second | 0-59
# Примеры
# "*/5 * * * *" — каждые 5 минут
# "0 0 * * *" — каждый день в 00:00
# "0 9 * * mon-fri" — рабочие дни в 09:00
# "0 0 1 * *" — первый день месяца
# "0 0 1 1 *" — первый день года
Persistent Jobs (сохраняющиеся задачи)
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlalchemy import create_engine
engine = create_engine('sqlite:///jobs.sqlite')
jobstores = {
'default': SQLAlchemyJobStore(engine=engine)
}
scheduler = BackgroundScheduler(jobstores=jobstores)
# Задачи сохраняются в БД и восстанавливаются при перезапуске
scheduler.add_job(
func=important_task,
trigger="interval",
hours=1,
replace_existing=True
)
scheduler.start()
3. Django Celery — для асинхронных задач
Используется в больших Django проектах:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# tasks.py
from celery import shared_task
from celery.schedules import crontab
@shared_task
def send_email_report(email):
# Отправить отчёт
print(f"Отправляем отчёт на {email}")
# Запланировать задачу
from celery.app import current_app
current_app.conf.beat_schedule = {
'send-report-every-day': {
'task': 'myapp.tasks.send_email_report',
'schedule': crontab(hour=9, minute=0),
'args': ('admin@example.com',)
},
'backup-every-hour': {
'task': 'myapp.tasks.backup_database',
'schedule': crontab(minute=0) # Каждый час
}
}
Запуск Celery
# Запустить worker, обрабатывающий задачи
celery -A myproject worker --loglevel=info
# Запустить Celery Beat (scheduler)
celery -A myproject beat
# Обычно запускают вместе
celery -A myproject worker --beat --loglevel=info
4. cron (системный инструмент)
Не Python, но интегрируется с Python скриптами:
# Отредактировать crontab
crontab -e
# Добавить строки
# Запустить скрипт каждый день в 10:30
30 10 * * * /usr/bin/python3 /home/user/send_report.py
# Запустить каждый час
0 * * * * /usr/bin/python3 /home/user/check_status.py
# Запустить каждый понедельник в 9:00
0 9 * * 1 /usr/bin/python3 /home/user/weekly_report.py
# Отправить выход на почту
30 10 * * * /usr/bin/python3 /home/user/backup.py | mail -s "Backup Report" admin@example.com
# Python скрипт для cron
#!/usr/bin/env python3
import sys
import logging
from datetime import datetime
logging.basicConfig(
filename='/var/log/backup.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
try:
backup_database()
logging.info("Backup успешен")
except Exception as e:
logging.error(f"Ошибка backup: {e}")
sys.exit(1)
5. systemd timer (Linux)
Модернизированная альтернатива cron:
# /etc/systemd/system/backup.service
[Unit]
Description=Daily Database Backup
After=network.target
[Service]
Type=oneshot
ExecStart=/usr/bin/python3 /home/user/backup.py
User=appuser
[Install]
WantedBy=multi-user.target
# /etc/systemd/system/backup.timer
[Unit]
Description=Run database backup daily
Requires=backup.service
[Timer]
OnCalendar=daily
OnCalendar=*-*-* 02:00:00
Persistent=true
[Install]
WantedBy=timers.target
# Включить и запустить
sudo systemctl enable backup.timer
sudo systemctl start backup.timer
# Проверить статус
sudo systemctl status backup.timer
6. Kubernetes CronJob (для Docker)
Для контейнеризованных приложений:
apiVersion: batch/v1
kind: CronJob
metadata:
name: backup-job
spec:
schedule: "0 2 * * *" # Каждый день в 02:00
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: myapp:latest
command: ["python", "backup.py"]
restartPolicy: OnFailure
7. Task Queues — многопроцессная обработка
Redis Queue (RQ)
from rq import Queue
from redis import Redis
import time
redis_conn = Redis()
q = Queue(connection=redis_conn)
def long_running_task(x):
time.sleep(5)
return x * x
# Поставить задачу в очередь
job = q.enqueue(long_running_task, 5)
# Проверить статус
print(job.get_status()) # queued, started, finished, failed
# Получить результат
if job.is_finished:
print(job.result) # 25
# Запустить worker в отдельном процессе
rq worker
Сравнение инструментов
| Инструмент | Сложность | Масштабируемость | Сохранение | Лучше для |
|---|---|---|---|---|
| schedule | Очень простая | Одна машина | Нет | Простые скрипты |
| APScheduler | Средняя | Одна машина | Да (опционально) | Standalone Python |
| Celery | Сложная | Много машин | Да (Redis/RabbitMQ) | Большие Django проекты |
| cron | Простая | Одна машина | Да | System-level задачи |
| systemd timer | Средняя | Одна машина | Да | Linux сервера |
| Kubernetes | Сложная | Много машин | Да | Cloud/Docker |
| RQ | Средняя | Много машин | Да (Redis) | Асинхронные задачи |
Практические советы
1. Логирование
import logging
from apscheduler.schedulers.background import BackgroundScheduler
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
scheduler = BackgroundScheduler()
scheduler.start()
2. Обработка ошибок
from apscheduler.schedulers.background import BackgroundScheduler
def error_handler(event):
if event.exception:
print(f'Задача упала: {event.exception}')
else:
print('Задача отменена')
scheduler = BackgroundScheduler()
scheduler.add_listener(error_handler)
3. Мониторинг выполнения
from apscheduler.schedulers.background import BackgroundScheduler
import time
def task_with_timeout():
start = time.time()
try:
long_task()
except Exception as e:
elapsed = time.time() - start
log_error(f"Task failed after {elapsed}s: {e}")
raise
Вывод
- schedule — для простых скриптов
- APScheduler — для standalone Python приложений
- Celery — для больших Django/Flask проектов
- cron/systemd — для системных задач
- Kubernetes CronJob — для Docker контейнеров
- RQ — для асинхронных задач с Redis