← Назад к вопросам

Что такое asyncio в Python и для каких задач его применяют?

1.0 Junior🔥 171 комментариев
#Python

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Asyncio в Python: асинхронное программирование для Data Engineer

Asyncio — это встроенная библиотека Python для написания асинхронного кода, который позволяет одновременно выполнять множество операций I/O (ввод-вывод) без использования потоков или процессов.

Что такое asyncio

Синхронный код (блокирующий):

# Когда мы ждём ответ от API, весь код замирает
import requests
import time

start = time.time()

# Запрос 1: 2 секунды
response1 = requests.get('https://api.example.com/data1')
print(f"Response 1: {response1.status_code}")  # ждём 2 сек

# Запрос 2: 2 секунды
response2 = requests.get('https://api.example.com/data2')
print(f"Response 2: {response2.status_code}")  # ждём ещё 2 сек

# Запрос 3: 2 секунды
response3 = requests.get('https://api.example.com/data3')
print(f"Response 3: {response3.status_code}")  # ждём ещё 2 сек

print(f"Total time: {time.time() - start:.1f}s")  # 6+ секунд! Неэффективно

Асинхронный код (неблокирующий) с asyncio:

import asyncio
import aiohttp  # асинхронная библиотека для HTTP
import time

async def fetch_data(session, url):
    """Асинхронная функция для получения данных"""
    async with session.get(url) as response:
        return await response.status_code  # ждём ответ

async def main():
    start = time.time()
    
    # Создаем асинхронную сессию
    async with aiohttp.ClientSession() as session:
        # Запускаем все 3 запроса ОДНОВРЕМЕННО
        tasks = [
            fetch_data(session, 'https://api.example.com/data1'),
            fetch_data(session, 'https://api.example.com/data2'),
            fetch_data(session, 'https://api.example.com/data3'),
        ]
        
        # Ждём, пока ВСЕ завершат (параллельно)
        results = await asyncio.gather(*tasks)
    
    print(f"Results: {results}")  # [200, 200, 200]
    print(f"Total time: {time.time() - start:.1f}s")  # ~2 секунды! Эффективно

# Запускаем асинхронную функцию
asyncio.run(main())

Сравнение подходов:

ПодходВремя для 3 запросовИспользование памятиСложность
Синхронный6 секундНизкоеПростой
Потоки (Threading)2 секундыСреднееСложный (race conditions)
Процессы (Multiprocessing)2 секундыВысокоеОчень сложный (IPC)
Asyncio2 секундыНизкоеСредне (await/async)

Event Loop: сердце asyncio

Как работает asyncio:

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)  # Асинхронная пауза
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    # Event loop выполняет задачи в "круговую"
    # Когда task1 ждёт (await), event loop переключается на task2
    await asyncio.gather(task1(), task2())

asyncio.run(main())

# Вывод:
# Task 1 started
# Task 2 started
# Task 2 finished   (через 1 сек)
# Task 1 finished   (через 2 сек)
# Всего: 2 секунды, а не 3!

Диаграмма event loop:

Время  Event Loop              Task 1           Task 2
0ms    ┌─ Запустить task1 ──→  начало          
       ├─ Запустить task2 ──────────────────→ начало
       │
1000ms ├─ Task2 ждёт sleep  ← переключение ← await sleep(1)
       │  (готова)
       └─ Task2 завершится
       │
2000ms └─ Task1 ждёт sleep  ← переключение ← await sleep(2)
           (готова)
           Task1 завершится

Применение asyncio в Data Engineering

1. Загрузка данных из множества API

import asyncio
import aiohttp
from typing import List, Dict

async def fetch_user_data(session, user_id: int) -> Dict:
    """Загружает данные одного пользователя"""
    async with session.get(f'https://api.example.com/users/{user_id}') as resp:
        return await resp.json()

async def load_all_users(user_ids: List[int]) -> List[Dict]:
    """Загружает данные для всех пользователей параллельно"""
    async with aiohttp.ClientSession() as session:
        # Вместо 10000 последовательных запросов (2+ часа)
        # Делаем 1000 параллельных батчей по 10 запросов (~5 минут)
        tasks = [fetch_user_data(session, uid) for uid in user_ids]
        
        # Выполняем батчами для экономии памяти
        batch_size = 100
        results = []
        for i in range(0, len(tasks), batch_size):
            batch = tasks[i:i+batch_size]
            batch_results = await asyncio.gather(*batch)
            results.extend(batch_results)
        
        return results

# Использование
user_ids = list(range(1, 10001))  # 10K пользователей
data = asyncio.run(load_all_users(user_ids))

2. Параллельная обработка данных

import asyncio
import pandas as pd

async def process_file(file_path: str) -> pd.DataFrame:
    """Обрабатывает один файл"""
    # Имитируем долгую обработку
    await asyncio.sleep(1)
    df = pd.read_csv(file_path)
    return df.groupby('category').size()

async def process_all_files(file_paths: List[str]) -> Dict:
    """Обрабатывает все файлы параллельно"""
    tasks = [process_file(fp) for fp in file_paths]
    results = await asyncio.gather(*tasks)
    
    # Объединяем результаты
    combined = pd.concat(results, axis=1).fillna(0).sum(axis=1)
    return combined.to_dict()

# Использование
files = [f'data_2024_{i:02d}.csv' for i in range(1, 13)]  # 12 месяцев
results = asyncio.run(process_all_files(files))

3. Мониторинг и обновление БД

import asyncio
import asyncpg  # асинхронный драйвер PostgreSQL

async def update_user_status(pool, user_id: int, new_status: str):
    """Обновляет статус пользователя"""
    async with pool.acquire() as connection:
        await connection.execute(
            'UPDATE users SET status = $1 WHERE id = $2',
            new_status, user_id
        )

async def batch_update_users(dsn: str, updates: Dict[int, str]):
    """Обновляет статусы для множества пользователей"""
    # Создаём пул подключений
    pool = await asyncpg.create_pool(dsn, min_size=5, max_size=20)
    
    try:
        # Обновляем все пользователей параллельно
        tasks = [
            update_user_status(pool, user_id, status)
            for user_id, status in updates.items()
        ]
        await asyncio.gather(*tasks)
    finally:
        await pool.close()

# Использование
updates = {1: 'active', 2: 'inactive', 3: 'active', ...}  # 1M обновлений
await batch_update_users('postgresql://user:pass@localhost/db', updates)

4. Streaming данных из Kafka в реальном времени

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

async def process_kafka_messages():
    """Читает сообщения из Kafka и обрабатывает их"""
    consumer = AIOKafkaConsumer(
        'input-topic',
        bootstrap_servers='localhost:9092',
        group_id='my-group'
    )
    
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092'
    )
    
    await consumer.start()
    await producer.start()
    
    try:
        async for message in consumer:
            # Обрабатываем сообщение
            data = message.value.decode('utf-8')
            result = process_event(data)  # быстрая обработка
            
            # Отправляем результат в другой топик
            await producer.send('output-topic', result.encode('utf-8'))
    finally:
        await consumer.stop()
        await producer.stop()

# Обработка 1M сообщений в реальном времени
await process_kafka_messages()

5. Оркестрация ETL процессов

import asyncio
import time

async def extract_data(source: str):
    """Extract фаза"""
    print(f"Extracting from {source}...")
    await asyncio.sleep(2)  # имитируем I/O
    return {"raw_data": f"from {source}"}

async def transform_data(data):
    """Transform фаза"""
    print("Transforming...")
    await asyncio.sleep(1)
    return {"transformed": data}

async def load_data(data, destination: str):
    """Load фаза"""
    print(f"Loading to {destination}...")
    await asyncio.sleep(1)
    return f"Loaded {len(data)} records"

async def etl_pipeline():
    """Полный ETL pipeline с параллельным извлечением"""
    # Извлекаем из 3 источников параллельно
    start = time.time()
    
    extracted_data = await asyncio.gather(
        extract_data('api_1'),
        extract_data('api_2'),
        extract_data('api_3'),
    )
    
    # Трансформируем
    transformed = await transform_data(extracted_data)
    
    # Загружаем
    result = await load_data(transformed, 'warehouse')
    
    print(f"Total time: {time.time() - start:.1f}s")
    # Было бы 6 сек последовательно, теперь 4 сек

await etl_pipeline()

Asyncio vs Threading vs Multiprocessing

import asyncio
import threading
import multiprocessing
import time

# Asyncio: идеален для I/O
async def io_task():
    await asyncio.sleep(1)  # I/O операция

# Threading: для I/O с ограничениями GIL
def io_task_thread():
    time.sleep(1)  # I/O операция

# Multiprocessing: для CPU-bound работы
def cpu_task():
    total = sum(range(100_000_000))  # CPU work
    return total

# Выбор инструмента:
if io_bound and io_heavy:          # 1000+ запросов к API
    use_asyncio()                   # ✅ Asyncio
elif io_bound and thread_blocking:   # БД с GIL issues
    use_threading()                 # ✅ Threading
elif cpu_bound:                      # ML обработка, крупные вычисления
    use_multiprocessing()           # ✅ Multiprocessing

Практические советы

1. Всегда используй asyncio.gather для параллелизма

# ❌ Плохо: последовательное выполнение
for user_id in user_ids:
    await fetch_user(user_id)

# ✅ Хорошо: параллельное выполнение
tasks = [fetch_user(uid) for uid in user_ids]
await asyncio.gather(*tasks)

2. Используй asyncio.Semaphore для ограничения параллелизма

async def limited_fetch(semaphore, session, url):
    async with semaphore:  # Только N одновременных запросов
        return await fetch_data(session, url)

semaphore = asyncio.Semaphore(100)  # Макс 100 одновременных
tasks = [
    limited_fetch(semaphore, session, url)
    for url in urls
]
results = await asyncio.gather(*tasks)

3. Обрабатывай исключения корректно

try:
    results = await asyncio.gather(
        *tasks,
        return_exceptions=True  # Не прерывать на первой ошибке
    )
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
except Exception as e:
    print(f"Critical error: {e}")

Вывод

Asyncio используется в Data Engineering для:

  • Параллельной загрузки данных из множества API (~10x ускорение)
  • Параллельной обработки файлов
  • Параллельного обновления БД
  • Streaming приложений (Kafka, WebSocket)
  • Оркестрации ETL процессов

Главное преимущество: низкие затраты на память и CPU по сравнению с threading/multiprocessing при обработке I/O операций.

Когда НЕ использовать: для CPU-bound операций (ML, расчёты) — нужны потоки или процессы.

Что такое asyncio в Python и для каких задач его применяют? | PrepBro