Как устроен механизм работы планировщика?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Механизм работы планировщика в Python
Планировщик (scheduler) — это компонент, управляющий выполнением задач в определенное время или периодически. Рассмотрим основные подходы и их устройство.
1. APScheduler — основной фреймворк
Мост встреч используемый планировщик в Python. Он работает на трех компонентах:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job('interval', seconds=30)
async def my_task():
print(f"Выполнение задачи: {datetime.now()}")
# Запуск планировщика
scheduler.start()
2. Архитектура APScheduler
Три ключевых компонента:
Scheduler — управляет основным циклом:
class Scheduler:
def __init__(self):
self.executor = ThreadPoolExecutor()
self.job_store = MemoryJobStore() # Где хранятся задачи
self.trigger_store = {} # Когда выполнять
def add_job(self, func, trigger, args=None):
job = Job(func, trigger, args)
self.job_store[job.id] = job
return job
def start(self):
while True:
now = datetime.now()
# Проверяем все задачи
for job in self.job_store.values():
if job.trigger.should_run(now):
self.executor.submit(job.func, *job.args)
time.sleep(0.1)
Trigger — определяет когда выполнять:
# Interval trigger — каждые N секунд
scheduler.add_job(task, 'interval', seconds=30)
# Cron trigger — по расписанию
scheduler.add_job(task, 'cron', hour=0, minute=0) # Ежедневно в полночь
# Date trigger — один раз в определенное время
from datetime import datetime
scheduler.add_job(task, 'date', run_date=datetime(2025, 12, 25, 12, 0))
# Custom trigger
class CustomTrigger:
def should_run(self, now):
return now.minute % 15 == 0 # Каждые 15 минут
Executor — выполняет задачи:
from concurrent.futures import ThreadPoolExecutor
# ThreadPoolExecutor — многопоточность
scheduler.configure(executors={
'default': ThreadPoolExecutor(max_workers=5)
})
# ProcessPoolExecutor — многопроцессность
from concurrent.futures import ProcessPoolExecutor
scheduler.configure(executors={
'default': ProcessPoolExecutor(max_workers=2)
})
3. Сохранение состояния (Job Store)
Типы хранилищ:
# Memory — теряется при перезагрузке
from apscheduler.jobstores.memory import MemoryJobStore
scheduler.add_jobstore(MemoryJobStore())
# Database — сохраняется в БД
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/jobs')
jobstore = SQLAlchemyJobStore(engine=engine)
scheduler.add_jobstore(jobstore, 'default')
# Redis — быстрое хранилище
from apscheduler.jobstores.redis import RedisJobStore
jobstore = RedisJobStore(connection=redis.Redis())
scheduler.add_jobstore(jobstore, 'default')
4. Как работает цикл планировщика
class Scheduler:
def start(self):
while self.running:
# 1. Получаем текущее время
now = datetime.now(pytz.UTC)
# 2. Проверяем все задачи
for job in self.job_store.get_due_jobs(now):
# 3. Задача готова к выполнению
self.executor.submit(self._run_job, job)
# 4. Спим некоторое время
time.sleep(self.misfire_grace_time) # ~1 сек
def _run_job(self, job):
try:
# Выполняем функцию
result = job.func(*job.args, **job.kwargs)
# Логируем успех
self.logger.info(f"Job {job.id} completed")
except Exception as e:
# Обработка ошибок (misfire_handler)
self.logger.error(f"Job {job.id} failed: {e}")
5. Асинхронный планировщик
Для async функций используй AsyncIOScheduler:
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job('interval', seconds=30)
async def async_task():
result = await some_async_operation()
print(result)
async def main():
scheduler.start()
# Планировщик работает в фоне
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
asyncio.run(main())
6. Обработка ошибок и переполнений
def error_handler(exc):
print(f"Ошибка в задаче: {exc}")
scheduler.add_listener(error_handler, EVENT_JOB_ERROR)
# Misfire — если задача не выполнилась в срок
scheduler.add_job(
task,
'interval',
seconds=10,
misfire_grace_time=60 # Если опоздали на 60 сек, выполнить
)
7. Cilery как альтернатива
Для распределённых систем используй Celery с Celery Beat:
from celery import Celery
from celery.schedules import crontab
app = Celery('myapp', broker='redis://localhost:6379')
# Периодическая задача
app.conf.beat_schedule = {
'check-status': {
'task': 'tasks.check_status',
'schedule': crontab(minute='*/5'), # Каждые 5 минут
},
}
@app.task
def check_status():
# Выполняется на любом воркере
return "Status checked"
8. Практический пример
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
class TaskScheduler:
def __init__(self):
self.scheduler = AsyncIOScheduler()
async def cleanup_old_data(self):
print(f"Очистка старых данных: {datetime.now()}")
# Удаляем записи старше 30 дней
async def send_notifications(self):
print(f"Отправка уведомлений: {datetime.now()}")
# Отправляем уведомления пользователям
def start(self):
# Добавляем задачи
self.scheduler.add_job(self.cleanup_old_data, 'interval', hours=24)
self.scheduler.add_job(self.send_notifications, 'cron', hour=9, minute=0)
self.scheduler.start()
async def run(self):
self.start()
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
self.scheduler.shutdown()
# Использование
async def main():
task_scheduler = TaskScheduler()
await task_scheduler.run()
asyncio.run(main())
Итог: Планировщики работают на основе цикла, проверяющего условия выполнения (trigger). APScheduler — для синхронных задач, AsyncIOScheduler — для async, Celery Beat — для распределённых систем. Правильный выбор зависит от архитектуры приложения.