← Назад к вопросам

Как устроен механизм работы планировщика?

1.8 Middle🔥 131 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Механизм работы планировщика в Python

Планировщик (scheduler) — это компонент, управляющий выполнением задач в определенное время или периодически. Рассмотрим основные подходы и их устройство.

1. APScheduler — основной фреймворк

Мост встреч используемый планировщик в Python. Он работает на трех компонентах:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime

scheduler = AsyncIOScheduler()

@scheduler.scheduled_job('interval', seconds=30)
async def my_task():
    print(f"Выполнение задачи: {datetime.now()}")

# Запуск планировщика
scheduler.start()

2. Архитектура APScheduler

Три ключевых компонента:

Scheduler — управляет основным циклом:

class Scheduler:
    def __init__(self):
        self.executor = ThreadPoolExecutor()
        self.job_store = MemoryJobStore()  # Где хранятся задачи
        self.trigger_store = {}  # Когда выполнять
    
    def add_job(self, func, trigger, args=None):
        job = Job(func, trigger, args)
        self.job_store[job.id] = job
        return job
    
    def start(self):
        while True:
            now = datetime.now()
            # Проверяем все задачи
            for job in self.job_store.values():
                if job.trigger.should_run(now):
                    self.executor.submit(job.func, *job.args)
            time.sleep(0.1)

Trigger — определяет когда выполнять:

# Interval trigger — каждые N секунд
scheduler.add_job(task, 'interval', seconds=30)

# Cron trigger — по расписанию
scheduler.add_job(task, 'cron', hour=0, minute=0)  # Ежедневно в полночь

# Date trigger — один раз в определенное время
from datetime import datetime
scheduler.add_job(task, 'date', run_date=datetime(2025, 12, 25, 12, 0))

# Custom trigger
class CustomTrigger:
    def should_run(self, now):
        return now.minute % 15 == 0  # Каждые 15 минут

Executor — выполняет задачи:

from concurrent.futures import ThreadPoolExecutor

# ThreadPoolExecutor — многопоточность
scheduler.configure(executors={
    'default': ThreadPoolExecutor(max_workers=5)
})

# ProcessPoolExecutor — многопроцессность
from concurrent.futures import ProcessPoolExecutor
scheduler.configure(executors={
    'default': ProcessPoolExecutor(max_workers=2)
})

3. Сохранение состояния (Job Store)

Типы хранилищ:

# Memory — теряется при перезагрузке
from apscheduler.jobstores.memory import MemoryJobStore
scheduler.add_jobstore(MemoryJobStore())

# Database — сохраняется в БД
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/jobs')
jobstore = SQLAlchemyJobStore(engine=engine)
scheduler.add_jobstore(jobstore, 'default')

# Redis — быстрое хранилище
from apscheduler.jobstores.redis import RedisJobStore
jobstore = RedisJobStore(connection=redis.Redis())
scheduler.add_jobstore(jobstore, 'default')

4. Как работает цикл планировщика

class Scheduler:
    def start(self):
        while self.running:
            # 1. Получаем текущее время
            now = datetime.now(pytz.UTC)
            
            # 2. Проверяем все задачи
            for job in self.job_store.get_due_jobs(now):
                # 3. Задача готова к выполнению
                self.executor.submit(self._run_job, job)
            
            # 4. Спим некоторое время
            time.sleep(self.misfire_grace_time)  # ~1 сек
    
    def _run_job(self, job):
        try:
            # Выполняем функцию
            result = job.func(*job.args, **job.kwargs)
            # Логируем успех
            self.logger.info(f"Job {job.id} completed")
        except Exception as e:
            # Обработка ошибок (misfire_handler)
            self.logger.error(f"Job {job.id} failed: {e}")

5. Асинхронный планировщик

Для async функций используй AsyncIOScheduler:

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

@scheduler.scheduled_job('interval', seconds=30)
async def async_task():
    result = await some_async_operation()
    print(result)

async def main():
    scheduler.start()
    # Планировщик работает в фоне
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        scheduler.shutdown()

asyncio.run(main())

6. Обработка ошибок и переполнений

def error_handler(exc):
    print(f"Ошибка в задаче: {exc}")

scheduler.add_listener(error_handler, EVENT_JOB_ERROR)

# Misfire — если задача не выполнилась в срок
scheduler.add_job(
    task,
    'interval',
    seconds=10,
    misfire_grace_time=60  # Если опоздали на 60 сек, выполнить
)

7. Cilery как альтернатива

Для распределённых систем используй Celery с Celery Beat:

from celery import Celery
from celery.schedules import crontab

app = Celery('myapp', broker='redis://localhost:6379')

# Периодическая задача
app.conf.beat_schedule = {
    'check-status': {
        'task': 'tasks.check_status',
        'schedule': crontab(minute='*/5'),  # Каждые 5 минут
    },
}

@app.task
def check_status():
    # Выполняется на любом воркере
    return "Status checked"

8. Практический пример

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime

class TaskScheduler:
    def __init__(self):
        self.scheduler = AsyncIOScheduler()
    
    async def cleanup_old_data(self):
        print(f"Очистка старых данных: {datetime.now()}")
        # Удаляем записи старше 30 дней
    
    async def send_notifications(self):
        print(f"Отправка уведомлений: {datetime.now()}")
        # Отправляем уведомления пользователям
    
    def start(self):
        # Добавляем задачи
        self.scheduler.add_job(self.cleanup_old_data, 'interval', hours=24)
        self.scheduler.add_job(self.send_notifications, 'cron', hour=9, minute=0)
        self.scheduler.start()
    
    async def run(self):
        self.start()
        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            self.scheduler.shutdown()

# Использование
async def main():
    task_scheduler = TaskScheduler()
    await task_scheduler.run()

asyncio.run(main())

Итог: Планировщики работают на основе цикла, проверяющего условия выполнения (trigger). APScheduler — для синхронных задач, AsyncIOScheduler — для async, Celery Beat — для распределённых систем. Правильный выбор зависит от архитектуры приложения.