← Назад к вопросам
Какие знаешь методы для получения результата асинхронной задачи?
2.0 Middle🔥 171 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Методы получения результата асинхронной задачи
В Python есть несколько способов получить результат из асинхронной задачи. Выбор зависит от фреймворка и требований.
1. Celery: AsyncResult.get()
Синхронное ожидание результата:
from celery import Celery
import time
app = Celery('tasks')
@app.task
def long_task(x, y):
time.sleep(5)
return x + y
# Запустить задачу асинхронно
result = long_task.delay(3, 4) # Задача запущена в фоне
print(result.id) # UUID задачи
# Метод 1: .get() — блокирующий вызов
try:
answer = result.get(timeout=10) # Ждём максимум 10 сек
print(f"Result: {answer}") # 7
except Exception as e:
print(f"Task failed: {e}")
# Проверить готовность результата
if result.ready():
print("Task completed")
else:
print("Task still running")
# Получить результат без ожидания (если готов)
if result.ready():
print(result.result) # Результат если задача завершена
Параметры .get():
timeout— максимальное время ожидания в секундахpropagate— пробросить исключение если задача упалаinterval— интервал проверки статуса
Проблемы:
- Блокирует текущий поток
- Если задача упала, .get() выбросит исключение
2. Celery: Polling (опрос результата)
Периодическая проверка статуса:
from celery import Celery
import time
app = Celery('tasks')
@app.task
def slow_task(n):
time.sleep(n)
return f"Done after {n} seconds"
# Запустить задачу
result = slow_task.delay(10)
# Опрашивать результат в цикле
while not result.ready():
print(f"Status: {result.status}")
time.sleep(1)
print(f"Final result: {result.result}")
# Проверить различные статусы
print(f"State: {result.state}")
# PENDING — задача ещё не выполняется
# STARTED — задача выполняется
# SUCCESS — задача успешно завершена
# FAILURE — задача упала
# RETRY — задача будет повторена
Статусы Celery:
PENDING— задача в очередиSTARTED— выполняетсяSUCCESS— успешноFAILURE— ошибкаRETRY— повторREVOKED— отменена
3. Celery: Callback (обратный вызов)
Выполнить код при завершении задачи:
from celery import Celery, group, chain
app = Celery('tasks')
@app.task
def process_data(data):
time.sleep(2)
return {"processed": data, "count": len(data)}
@app.task
def send_notification(result):
"""Вызовется после завершения process_data"""
print(f"Processing complete! Result: {result}")
# Отправить email, webhook, и т.д.
# Связать задачи с callback
chain(
process_data.s([1, 2, 3]),
send_notification.s()
).apply_async()
# Или с link (простой callback)
result = process_data.delay([1, 2, 3])
result.link(send_notification.s()) # Вызвать после
# С обработкой ошибок
result = process_data.delay([1, 2, 3])
result.link_error(handle_error.s()) # Вызвать при ошибке
4. Celery: Chord (параллельные + финальная задача)
Запустить много задач параллельно, потом обработать результаты:
from celery import Celery, chord
app = Celery('tasks')
@app.task
def process_file(filename):
print(f"Processing {filename}...")
return {"file": filename, "lines": 100}
@app.task
def combine_results(results):
"""Получит список результатов от всех процесс_file"""
total_lines = sum(r["lines"] for r in results)
print(f"Total lines: {total_lines}")
return {"total": total_lines, "files_processed": len(results)}
# Запустить много процессов параллельно, потом объединить
files = ["file1.txt", "file2.txt", "file3.txt"]
workflow = chord(
[process_file.s(f) for f in files] # Параллельная обработка
)(combine_results.s()) # Потом объединение результатов
result = workflow.apply_async()
final_result = result.get() # Ждём финального результата
print(final_result)
5. asyncio: await (встроенный Python)
Встроенная асинхронность в Python 3.7+:
import asyncio
import time
async def fetch_data(url, delay=2):
print(f"Fetching {url}...")
await asyncio.sleep(delay) # Имитация сетевого запроса
return {"url": url, "data": "response"}
async def main():
# Способ 1: await (ждём результат)
result = await fetch_data("https://api.example.com/data")
print(f"Result: {result}")
# Способ 2: параллельное выполнение
results = await asyncio.gather(
fetch_data("https://api.example.com/data1"),
fetch_data("https://api.example.com/data2"),
fetch_data("https://api.example.com/data3")
)
print(f"All results: {results}")
# Способ 3: create_task (запустить без ожидания)
task = asyncio.create_task(fetch_data("https://..."))
# Делать что-то ещё
result = await task # Ждём результат позже
asyncio.run(main())
Различные паттерны asyncio:
import asyncio
# Основной паттерн
async def example():
# await блокирует до завершения
result = await async_function()
# Параллельное выполнение
async def parallel():
# Все три выполняются одновременно
results = await asyncio.gather(
async_func1(),
async_func2(),
async_func3()
)
# Первый готовый результат
async def first_done():
done, pending = await asyncio.wait(
[async_func1(), async_func2()],
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
print(task.result())
# С timeout
async def with_timeout():
try:
result = await asyncio.wait_for(
slow_async_func(),
timeout=5.0
)
except asyncio.TimeoutError:
print("Timeout!")
6. FastAPI: Background Tasks
Запустить задачу в фоне, вернуть результат клиенту:
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse
app = FastAPI()
def process_in_background(item_id: int, user_email: str):
"""Выполняется после возврата ответа клиенту"""
print(f"Processing item {item_id} for {user_email}")
time.sleep(5) # Долгая операция
print(f"Done with {item_id}")
@app.post("/items/{item_id}")
async def create_item(
item_id: int,
background_tasks: BackgroundTasks,
user_email: str = "test@example.com"
):
# Добавить задачу в фон
background_tasks.add_task(
process_in_background,
item_id=item_id,
user_email=user_email
)
# Вернуть результат сразу
return {
"item_id": item_id,
"status": "processing",
"message": "Task queued, processing in background"
}
7. Threading: Event.wait()
Ждать результата в многопоточном приложении:
import threading
import time
class AsyncTask:
def __init__(self):
self.result = None
self.event = threading.Event() # Событие для синхронизации
self.exception = None
def run(self):
try:
time.sleep(2)
self.result = 42
except Exception as e:
self.exception = e
finally:
self.event.set() # Сигнализировать о завершении
def get_result(self, timeout=10):
# Ждём, пока результат будет готов
if self.event.wait(timeout=timeout):
if self.exception:
raise self.exception
return self.result
else:
raise TimeoutError("Task did not complete in time")
# Использование
task = AsyncTask()
thread = threading.Thread(target=task.run)
thread.start()
result = task.get_result() # Ждём результат
print(result) # 42
8. Concurrent.futures: Future
Встроенный способ для многопроцессных/многопоточных задач:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
def heavy_computation(n):
time.sleep(2)
return n * n
# Метод 1: ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
# Запустить задачи
future1 = executor.submit(heavy_computation, 5)
future2 = executor.submit(heavy_computation, 10)
future3 = executor.submit(heavy_computation, 15)
# Получить результаты
print(future1.result()) # 25 (блокирует)
print(future2.result())
print(future3.result())
# Или как заканчиваются
futures = [future1, future2, future3]
for future in as_completed(futures):
print(f"Result: {future.result()}")
# Метод 2: map (как для list comprehension)
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(heavy_computation, [5, 10, 15])
for result in results: # По мере завершения
print(result)
Сравнение методов
| Метод | Фреймворк | Блокирует ли | Параллельно |
|---|---|---|---|
| .get() | Celery | ✅ | Асинчая |
| await | asyncio | ✅ | Асинхронно |
| Callback | Celery | ❌ | Асинхронно |
| Chord | Celery | ❌ | Параллельно |
| future.result() | concurrent.futures | ✅ | Многопотоки |
| as_completed | concurrent.futures | ✅ (частично) | Многопотоки |
Практический пример: Комбинированный подход
from fastapi import FastAPI, BackgroundTasks
from celery import Celery
import redis
app = FastAPI()
celery = Celery('app')
redis_client = redis.Redis()
@celery.task
def heavy_processing(data):
# Долгая операция
result = sum(data) * 2
return result
@app.post("/process")
async def submit_processing(data: list[int]):
# Запустить в Celery
result = heavy_processing.delay(data)
# Сохранить ID в Redis
redis_client.setex(
f"task:{result.id}",
3600,
"processing"
)
return {"task_id": result.id}
@app.get("/result/{task_id}")
async def get_result(task_id: str):
from celery.result import AsyncResult
result = AsyncResult(task_id, app=celery)
if result.state == 'PENDING':
return {"status": "processing"}
elif result.state == 'SUCCESS':
return {"status": "done", "result": result.result}
elif result.state == 'FAILURE':
return {"status": "failed", "error": str(result.info)}
Вывод: Выбор метода зависит от контекста:
- Celery — для распределённых асинхронных задач
- asyncio — для I/O-bound операций
- concurrent.futures — для CPU-bound и простого распараллеливания
- FastAPI BackgroundTasks — для лёгких фоновых работ