← Назад к вопросам

В каком потоке создается задача Celery при её выполнении

3.0 Senior🔥 61 комментариев
#Асинхронность и многопоточность#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Потоки выполнения Celery задач

Это интересный вопрос про внутреннее устройство Celery. Расскажу о том, как Celery управляет потоками и процессами при выполнении задач.

Архитектура Celery

┌─────────────────┐
│ Django/FastAPI  │ (основное приложение)
└────────┬────────┘
         │ task.delay() / task.apply_async()
         ↓
┌─────────────────┐
│ Message Broker  │ (RabbitMQ, Redis)
│ [task queue]    │
└────────┬────────┘
         │
         ↓
┌──────────────────────────────┐
│ Celery Worker               │
│ (отдельный процесс)         │
│ ┌────────────────────────┐  │
│ │ Pool Manager           │  │
│ │ (управляет потоками)  │  │
│ ├────────────────────────┤  │
│ │ Thread/Process 1      │  │
│ │ [executing task 1]    │  │
│ ├────────────────────────┤  │
│ │ Thread/Process 2      │  │
│ │ [executing task 2]    │  │
│ └────────────────────────┘  │
└──────────────────────────────┘

Где выполняется задача

Ответ зависит от типа pool в Celery:

1. Prefork Pool (по умолчанию) — ПРОЦЕССЫ

Это самый популярный pool. Каждая задача выполняется в отдельном процессе:

# celery.py
from celery import Celery

app = Celery('myapp')
app.conf.update(
    broker_url='redis://localhost:6379',
    worker_pool='prefork',  # По умолчанию
    worker_concurrency=4,   # 4 процесса
)

@app.task
def send_email(email):
    print(f"Запущена в процессе {os.getpid()}")
    time.sleep(2)
    return f"Email отправлен на {email}"

При запуске:

$ celery -A myapp worker --loglevel=info

Celery v5.3.0 (sun-sailor) linux-gnu

Worker:
  . app.tasks.send_email

[2024-01-15 10:00:00,000] ESTABLISHED CONNECTION with broker
[2024-01-15 10:00:00,100] celery@worker1: Calling app.tasks.send_email['task-id-123']

Внутри worker'а:

Main Process (PID 1234)
├─ Subprocess 1 (PID 1235) — выполняет send_email
├─ Subprocess 2 (PID 1236) — выполняет другую задачу
├─ Subprocess 3 (PID 1237) — свободен
└─ Subprocess 4 (PID 1238) — свободен

Ключевой момент: задача выполняется в отдельном процессе, не в главном потоке worker'а.

2. Threads Pool — ПОТОКИ

Если явно указать threads pool:

app.conf.update(
    worker_pool='threads',
    worker_concurrency=10,  # 10 потоков
)

Тогда задачи выполняются в потоках внутри одного процесса:

Main Process (PID 1234)
├─ Thread 1 — выполняет send_email
├─ Thread 2 — выполняет другую задачу
├─ Thread 3 — свободен
└─ ...

Важно: все потоки делят одну память, поэтому

  • Гораздо дешевле (меньше памяти)
  • Опасно с GIL (только один поток может выполнять Python код одновременно)
  • Хорошо для I/O-bound операций

3. Solo Pool — ГЛАВНЫЙ ПРОЦЕСС

Для отладки или простых случаев:

app.conf.update(
    worker_pool='solo',
)

Задача выполняется в главном процессе worker'а:

Main Process (PID 1234)
├─ Выполняем send_email прямо здесь
├─ Всё остальное ждёт
└─ Процесс блокирован до завершения

Используется только в разработке, потому что:

  • Блокирует worker
  • Только одна задача в раз
  • Нет параллелизма

Детальный пример с Prefork

import os
import time
from celery import Celery

app = Celery('myapp')
app.conf.update(
    broker_url='redis://localhost:6379',
    worker_pool='prefork',
    worker_concurrency=2,
)

@app.task
def long_task(duration):
    pid = os.getpid()  # ID процесса, где выполняется
    parent_pid = os.getppid()  # ID parent process
    print(f"Task выполняется в процессе {pid} (parent: {parent_pid})")
    time.sleep(duration)
    return f"Завершено в процессе {pid}"

if __name__ == "__main__":
    # Отправить 3 задачи
    task1 = long_task.delay(5)
    task2 = long_task.delay(3)
    task3 = long_task.delay(4)
    
    print(f"Main process: {os.getpid()}")

Вывод из worker'а:

celery@worker1: Concurrency: 2 (prefork)

Task выполняется в процессе 1235 (parent: 1234)
Task выполняется в процессе 1236 (parent: 1234)
Task выполняется в процессе 1235 (parent: 1234)  ← переиспользуется процесс
Завершено в процессе 1235
Завершено в процессе 1236
Завершено в процессе 1235

Видно:

  • worker запустил 2 подпроцесса (concurrency=2)
  • первые две задачи выполняются параллельно (в разных процессах)
  • третья задача ждёт, пока процесс освободится

Жизненный цикл задачи

┌─────────────────────────────────────┐
│ 1. Отправка задачи                  │
│    task.delay(args)                 │
│    → serialize → send to broker      │
└────────────┬────────────────────────┘
             ↓
┌─────────────────────────────────────┐
│ 2. Задача в очереди (Redis/RabbitMQ)│
│    Статус: PENDING                  │
└────────────┬────────────────────────┘
             ↓
┌─────────────────────────────────────┐
│ 3. Worker получил задачу            │
│    Статус: RECEIVED                 │
└────────────┬────────────────────────┘
             ↓
┌─────────────────────────────────────┐
│ 4. Создаётся/переиспользуется       │
│    процесс (prefork) или поток      │
│    Статус: STARTED                  │
└────────────┬────────────────────────┘
             ↓
┌─────────────────────────────────────┐
│ 5. Выполняется функция в процессе   │
│    @app.task                        │
│    Статус: PROGRESS (опционально)   │
└────────────┬────────────────────────┘
             ↓
┌─────────────────────────────────────┐
│ 6. Результат сохраняется            │
│    в результат backend (Redis/DB)   │
│    Статус: SUCCESS или FAILURE      │
└─────────────────────────────────────┘

Процесс vs Поток: при выборе?

АспектPrefork (Процессы)Threads
ПамятьБольше (~40MB/process)Мало (~2MB/thread)
БезопасностьИзолированыГонка за данные
I/O-boundХорошоОтлично (GIL)
CPU-boundОтличноПлохо (GIL)
Фиксированные задачиПереиспользуютсяПереиспользуются
Memory leaksАвтоиспление (новый процесс)Накапливаются

Когда какой pool выбрать

Используй Prefork (по умолчанию)

# Стандартно, безопасно, надёжно
app.conf.worker_pool = 'prefork'
app.conf.worker_concurrency = 4

Для:

  • Production
  • CPU-bound задачи
  • Длительные задачи
  • Когда не уверен

Используй Threads

app.conf.worker_pool = 'threads'
app.conf.worker_concurrency = 10

Для:

  • I/O-bound задачи (requests, DB queries)
  • Когда нужно экономить память
  • Когда задачи короткие

Используй Solo

app.conf.worker_pool = 'solo'

Для:

  • Development и отладка
  • Synchronous processing
  • Когда задач одна в раз

Вывод

Ответ на вопрос "в каком потоке выполняется задача":

По умолчанию — в отдельном ПРОЦЕССЕ (prefork pool).

Это идеальное решение для большинства случаев, потому что:

  1. Полная изоляция каждой задачи
  2. Не страдает от GIL
  3. Автоматически переиспользуются процессы
  4. Memory leaks ограничены (новый процесс = чистая память)
  5. Хорошая безопасность и надёжность

Если нужна одна задача за раз или очень много I/O, можно выбрать threads pool для экономии памяти.