← Назад к вопросам

Какие знаешь методы для получения результата асинхронной задачи?

2.0 Middle🔥 171 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Методы получения результата асинхронной задачи

В Python есть несколько способов получить результат из асинхронной задачи. Выбор зависит от фреймворка и требований.

1. Celery: AsyncResult.get()

Синхронное ожидание результата:

from celery import Celery
import time

app = Celery('tasks')

@app.task
def long_task(x, y):
    time.sleep(5)
    return x + y

# Запустить задачу асинхронно
result = long_task.delay(3, 4)  # Задача запущена в фоне
print(result.id)  # UUID задачи

# Метод 1: .get() — блокирующий вызов
try:
    answer = result.get(timeout=10)  # Ждём максимум 10 сек
    print(f"Result: {answer}")  # 7
except Exception as e:
    print(f"Task failed: {e}")

# Проверить готовность результата
if result.ready():
    print("Task completed")
else:
    print("Task still running")

# Получить результат без ожидания (если готов)
if result.ready():
    print(result.result)  # Результат если задача завершена

Параметры .get():

  • timeout — максимальное время ожидания в секундах
  • propagate — пробросить исключение если задача упала
  • interval — интервал проверки статуса

Проблемы:

  • Блокирует текущий поток
  • Если задача упала, .get() выбросит исключение

2. Celery: Polling (опрос результата)

Периодическая проверка статуса:

from celery import Celery
import time

app = Celery('tasks')

@app.task
def slow_task(n):
    time.sleep(n)
    return f"Done after {n} seconds"

# Запустить задачу
result = slow_task.delay(10)

# Опрашивать результат в цикле
while not result.ready():
    print(f"Status: {result.status}")
    time.sleep(1)

print(f"Final result: {result.result}")

# Проверить различные статусы
print(f"State: {result.state}")
# PENDING — задача ещё не выполняется
# STARTED — задача выполняется
# SUCCESS — задача успешно завершена
# FAILURE — задача упала
# RETRY — задача будет повторена

Статусы Celery:

  • PENDING — задача в очереди
  • STARTED — выполняется
  • SUCCESS — успешно
  • FAILURE — ошибка
  • RETRY — повтор
  • REVOKED — отменена

3. Celery: Callback (обратный вызов)

Выполнить код при завершении задачи:

from celery import Celery, group, chain

app = Celery('tasks')

@app.task
def process_data(data):
    time.sleep(2)
    return {"processed": data, "count": len(data)}

@app.task
def send_notification(result):
    """Вызовется после завершения process_data"""
    print(f"Processing complete! Result: {result}")
    # Отправить email, webhook, и т.д.

# Связать задачи с callback
chain(
    process_data.s([1, 2, 3]),
    send_notification.s()
).apply_async()

# Или с link (простой callback)
result = process_data.delay([1, 2, 3])
result.link(send_notification.s())  # Вызвать после

# С обработкой ошибок
result = process_data.delay([1, 2, 3])
result.link_error(handle_error.s())  # Вызвать при ошибке

4. Celery: Chord (параллельные + финальная задача)

Запустить много задач параллельно, потом обработать результаты:

from celery import Celery, chord

app = Celery('tasks')

@app.task
def process_file(filename):
    print(f"Processing {filename}...")
    return {"file": filename, "lines": 100}

@app.task
def combine_results(results):
    """Получит список результатов от всех процесс_file"""
    total_lines = sum(r["lines"] for r in results)
    print(f"Total lines: {total_lines}")
    return {"total": total_lines, "files_processed": len(results)}

# Запустить много процессов параллельно, потом объединить
files = ["file1.txt", "file2.txt", "file3.txt"]

workflow = chord(
    [process_file.s(f) for f in files]  # Параллельная обработка
)(combine_results.s())  # Потом объединение результатов

result = workflow.apply_async()
final_result = result.get()  # Ждём финального результата
print(final_result)

5. asyncio: await (встроенный Python)

Встроенная асинхронность в Python 3.7+:

import asyncio
import time

async def fetch_data(url, delay=2):
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)  # Имитация сетевого запроса
    return {"url": url, "data": "response"}

async def main():
    # Способ 1: await (ждём результат)
    result = await fetch_data("https://api.example.com/data")
    print(f"Result: {result}")
    
    # Способ 2: параллельное выполнение
    results = await asyncio.gather(
        fetch_data("https://api.example.com/data1"),
        fetch_data("https://api.example.com/data2"),
        fetch_data("https://api.example.com/data3")
    )
    print(f"All results: {results}")
    
    # Способ 3: create_task (запустить без ожидания)
    task = asyncio.create_task(fetch_data("https://..."))
    # Делать что-то ещё
    result = await task  # Ждём результат позже

asyncio.run(main())

Различные паттерны asyncio:

import asyncio

# Основной паттерн
async def example():
    # await блокирует до завершения
    result = await async_function()

# Параллельное выполнение
async def parallel():
    # Все три выполняются одновременно
    results = await asyncio.gather(
        async_func1(),
        async_func2(),
        async_func3()
    )

# Первый готовый результат
async def first_done():
    done, pending = await asyncio.wait(
        [async_func1(), async_func2()],
        return_when=asyncio.FIRST_COMPLETED
    )
    for task in done:
        print(task.result())

# С timeout
async def with_timeout():
    try:
        result = await asyncio.wait_for(
            slow_async_func(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("Timeout!")

6. FastAPI: Background Tasks

Запустить задачу в фоне, вернуть результат клиенту:

from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse

app = FastAPI()

def process_in_background(item_id: int, user_email: str):
    """Выполняется после возврата ответа клиенту"""
    print(f"Processing item {item_id} for {user_email}")
    time.sleep(5)  # Долгая операция
    print(f"Done with {item_id}")

@app.post("/items/{item_id}")
async def create_item(
    item_id: int,
    background_tasks: BackgroundTasks,
    user_email: str = "test@example.com"
):
    # Добавить задачу в фон
    background_tasks.add_task(
        process_in_background,
        item_id=item_id,
        user_email=user_email
    )
    # Вернуть результат сразу
    return {
        "item_id": item_id,
        "status": "processing",
        "message": "Task queued, processing in background"
    }

7. Threading: Event.wait()

Ждать результата в многопоточном приложении:

import threading
import time

class AsyncTask:
    def __init__(self):
        self.result = None
        self.event = threading.Event()  # Событие для синхронизации
        self.exception = None
    
    def run(self):
        try:
            time.sleep(2)
            self.result = 42
        except Exception as e:
            self.exception = e
        finally:
            self.event.set()  # Сигнализировать о завершении
    
    def get_result(self, timeout=10):
        # Ждём, пока результат будет готов
        if self.event.wait(timeout=timeout):
            if self.exception:
                raise self.exception
            return self.result
        else:
            raise TimeoutError("Task did not complete in time")

# Использование
task = AsyncTask()
thread = threading.Thread(target=task.run)
thread.start()

result = task.get_result()  # Ждём результат
print(result)  # 42

8. Concurrent.futures: Future

Встроенный способ для многопроцессных/многопоточных задач:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def heavy_computation(n):
    time.sleep(2)
    return n * n

# Метод 1: ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
    # Запустить задачи
    future1 = executor.submit(heavy_computation, 5)
    future2 = executor.submit(heavy_computation, 10)
    future3 = executor.submit(heavy_computation, 15)
    
    # Получить результаты
    print(future1.result())  # 25 (блокирует)
    print(future2.result())
    print(future3.result())
    
    # Или как заканчиваются
    futures = [future1, future2, future3]
    for future in as_completed(futures):
        print(f"Result: {future.result()}")

# Метод 2: map (как для list comprehension)
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(heavy_computation, [5, 10, 15])
    for result in results:  # По мере завершения
        print(result)

Сравнение методов

МетодФреймворкБлокирует лиПараллельно
.get()CeleryАсинчая
awaitasyncioАсинхронно
CallbackCeleryАсинхронно
ChordCeleryПараллельно
future.result()concurrent.futuresМногопотоки
as_completedconcurrent.futures✅ (частично)Многопотоки

Практический пример: Комбинированный подход

from fastapi import FastAPI, BackgroundTasks
from celery import Celery
import redis

app = FastAPI()
celery = Celery('app')
redis_client = redis.Redis()

@celery.task
def heavy_processing(data):
    # Долгая операция
    result = sum(data) * 2
    return result

@app.post("/process")
async def submit_processing(data: list[int]):
    # Запустить в Celery
    result = heavy_processing.delay(data)
    
    # Сохранить ID в Redis
    redis_client.setex(
        f"task:{result.id}",
        3600,
        "processing"
    )
    
    return {"task_id": result.id}

@app.get("/result/{task_id}")
async def get_result(task_id: str):
    from celery.result import AsyncResult
    
    result = AsyncResult(task_id, app=celery)
    
    if result.state == 'PENDING':
        return {"status": "processing"}
    elif result.state == 'SUCCESS':
        return {"status": "done", "result": result.result}
    elif result.state == 'FAILURE':
        return {"status": "failed", "error": str(result.info)}

Вывод: Выбор метода зависит от контекста:

  • Celery — для распределённых асинхронных задач
  • asyncio — для I/O-bound операций
  • concurrent.futures — для CPU-bound и простого распараллеливания
  • FastAPI BackgroundTasks — для лёгких фоновых работ