← Назад к вопросам

Для каких задач используются корутина

1.8 Middle🔥 251 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Корутины в Python: назначение и применение

Корутина — это обобщение функции, которая может приостанавливать своё выполнение и передавать управление другому коду, а затем продолжать работу с того же места. Это основа асинхронного программирования в Python.

Основные задачи, для которых используются корутины

1. I/O операции (главное применение)

Корутины идеальны для операций, которые требуют ожидания (I/O-bound):

Сетевые запросы

import asyncio
import aiohttp

# Без корутин (синхронный код)
import requests

def fetch_urls_sync():
    urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 6)]
    results = []
    for url in urls:
        response = requests.get(url)  # Ждём каждый запрос!
        results.append(response.json())
    return results

# Время: 5 запросов × 0.5 сек = 2.5 сек

# С корутинами (асинхронный код)
async def fetch_urls_async():
    urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 6)]
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]  # Создаём все задачи
        responses = await asyncio.gather(*tasks)  # Выполняем параллельно!
        results = [await resp.json() for resp in responses]
    return results

# Время: все запросы параллельно = 0.5 сек

asyncio.run(fetch_urls_async())

Работа с БД

import asyncio
import asyncpg  # Асинхронный драйвер PostgreSQL

# Синхронный код (блокирует поток)
def get_users_sync(user_ids):
    import psycopg2
    conn = psycopg2.connect("dbname=mydb user=postgres")
    cursor = conn.cursor()
    
    users = []
    for user_id in user_ids:
        cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        users.append(cursor.fetchone())
    # Время: N запросов последовательно
    return users

# Асинхронный код с корутинами
async def get_users_async(user_ids):
    conn = await asyncpg.connect("postgresql://postgres@localhost/mydb")
    
    # Создаём корутины для всех запросов
    tasks = [conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id) 
             for user_id in user_ids]
    
    # Выполняем все параллельно
    users = await asyncio.gather(*tasks)
    await conn.close()
    return users

# Время: все запросы параллельно (вместо N × время_запроса)

Чтение/запись файлов

import asyncio
import aiofiles

# Синхронный код (блокирует)
def read_files_sync(filenames):
    results = []
    for filename in filenames:
        with open(filename, r) as f:
            results.append(f.read())  # Ждём I/O
    return results

# Асинхронный код
async def read_files_async(filenames):
    async def read_file(filename):
        async with aiofiles.open(filename, r) as f:
            return await f.read()
    
    # Все файлы читаются параллельно
    return await asyncio.gather(*[read_file(f) for f in filenames])

2. Веб-приложения (Web Servers)

Корутины необходимы для обработки тысяч одновременных подключений:

FastAPI (асинхронный фреймворк)

from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.get("/api/data/{item_id}")
async def get_data(item_id: int):
    # Эта корутина обрабатывает один запрос
    # FastAPI может обработать 1000+ корутин одновременно
    
    # Параллельные операции
    user = await fetch_user_from_db(item_id)
    recommendations = await fetch_recommendations(item_id)
    analytics = await log_analytics(item_id)
    
    return {
        "user": user,
        "recommendations": recommendations
    }

@app.post("/api/process")
async def process_data():
    # Обработка нескольких I/O операций параллельно
    results = await asyncio.gather(
        external_api_call_1(),
        external_api_call_2(),
        database_query(),
        cache_lookup()
    )
    return {"status": "processed", "data": results}

Сравнение пропускной способности

Синхронный сервер (Flask + threading):
- 1 поток = 1 запрос
- 100 потоков = 100 одновременных запросов
- Переключение контекста = overhead

Асинхронный сервер (FastAPI + asyncio):
- 1 поток = 1000+ корутин одновременно
- Нет переключения контекста
- 10-100x пропускная способность

3. Телеграм боты

Аiogram использует корутины для обработки обновлений:

from aiogram import Dispatcher, Router, F
from aiogram.types import Message

router = Router()

@router.message(F.text == "/start")
async def start_handler(message: Message):
    # Эта корутина обрабатывает сообщение
    await message.answer("Hello!")
    user = await fetch_user_from_db(message.from_user.id)
    
    # Параллельные операции
    await asyncio.gather(
        log_user_action(user),
        update_user_stats(user),
        send_notification(user)
    )

@router.message()
async def handle_all_messages(message: Message):
    # Может обработать 10000+ пользователей одновременно
    await message.answer(f"You wrote: {message.text}")

4. Долгоживущие фоновые задачи

Корутины используются для задач, которые периодически выполняют I/O:

import asyncio

# Фоновая задача
async def background_worker():
    while True:
        # Периодически проверяем очередь
        tasks = await fetch_pending_tasks()
        
        # Обрабатываем все задачи параллельно
        results = await asyncio.gather(*[
            process_task(task) for task in tasks
        ])
        
        await asyncio.sleep(5)  # Ждём 5 сек, потом повторяем

# Запуск фонового воркера
asyncio.create_task(background_worker())

Пример с Celery + asyncio

from celery import Celery
import asyncio

app = Celery(tasks)

@app.task
def process_emails_batch(email_list):
    # Celery вызывает эту задачу в фоне
    # Внутри используем asyncio для параллельной отправки
    
    async def send_emails():
        return await asyncio.gather(*[
            send_email_async(email) for email in email_list
        ])
    
    return asyncio.run(send_emails())

5. Websockets и real-time приложения

Корутины идеальны для постоянных соединений:

from fastapi import FastAPI, WebSocket
import asyncio

app = FastAPI()

@app.websocket("/ws/chat/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
    await websocket.accept()
    
    try:
        while True:
            # Получаем сообщение (корутина)
            data = await websocket.receive_text()
            
            # Параллельно: сохраняем, отправляем другим, логируем
            await asyncio.gather(
                save_message_to_db(room_id, data),
                broadcast_to_room(room_id, data),
                log_message(data)
            )
    except Exception:
        pass

6. Обработка потоков данных (Streaming)

import asyncio

# Потребитель (consumer)
async def process_items():
    async for item in async_item_generator():
        result = await process_item(item)
        await store_result(result)

# Асинхронный генератор
async def async_item_generator():
    for i in range(1000):
        await asyncio.sleep(0.1)  # Имитация I/O
        yield {"id": i, "data": f"item_{i}"}

7. Тайм-ауты и управление жизненным циклом

import asyncio

async def fetch_with_timeout():
    try:
        # Гарантированно выполнится максимум 5 секунд
        result = await asyncio.wait_for(
            slow_api_call(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("API call took too long")

# Отмена задачи
async def main():
    task = asyncio.create_task(long_running_operation())
    
    await asyncio.sleep(2)
    task.cancel()  # Отменить задачу
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

Когда НЕ нужны корутины

❌ CPU-bound операции

# Плохо: корутины не помогают
async def compute_fibonacci(n):
    if n <= 1:
        return n
    return await compute_fibonacci(n-1) + await compute_fibonacci(n-2)
    # Всё равно медленно, потому что это CPU-bound!

# Хорошо: используй threading или multiprocessing
from multiprocessing import Pool

def compute_fibonacci(n):
    if n <= 1:
        return n
    return compute_fibonacci(n-1) + compute_fibonacci(n-2)

with Pool(4) as p:
    results = p.map(compute_fibonacci, [35, 36, 37, 38])

❌ Простые синхронные API

# Корутины усложняют код без пользы
async def simple_calculation():
    return 2 + 2

# Просто функция
def simple_calculation():
    return 2 + 2

Таблица: когда использовать корутины

ЗадачаНужны корутины?Причина
I/O операции (HTTP, БД, файлы)✅ ДаМожно обрабатывать много параллельно
CPU-bound вычисления❌ НетИспользуй threading/multiprocessing
Веб-сервер (1000+ запросов)✅ ДаНе нужны потоки, боль с синхронизацией
Простой скрипт❌ НетУсложняет код
Telegram bot✅ ДаМного одновременных пользователей
WebSocket соединения✅ ДаПостоянное соединение
Фоновые задачи (Celery)✅ ДаМного параллельных задач

Выводы

Корутины используются для I/O-bound операций: сетевые запросы, работа с БД, файлы, WebSockets. Основная выгода — можно обработать тысячи одновременных операций в одном потоке без overhead переключения контекста, как в многопоточности. Это критично для веб-приложений, ботов и real-time систем.