Что такое asyncio в Python и для каких задач его применяют?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Asyncio в Python: асинхронное программирование для Data Engineer
Asyncio — это встроенная библиотека Python для написания асинхронного кода, который позволяет одновременно выполнять множество операций I/O (ввод-вывод) без использования потоков или процессов.
Что такое asyncio
Синхронный код (блокирующий):
# Когда мы ждём ответ от API, весь код замирает
import requests
import time
start = time.time()
# Запрос 1: 2 секунды
response1 = requests.get('https://api.example.com/data1')
print(f"Response 1: {response1.status_code}") # ждём 2 сек
# Запрос 2: 2 секунды
response2 = requests.get('https://api.example.com/data2')
print(f"Response 2: {response2.status_code}") # ждём ещё 2 сек
# Запрос 3: 2 секунды
response3 = requests.get('https://api.example.com/data3')
print(f"Response 3: {response3.status_code}") # ждём ещё 2 сек
print(f"Total time: {time.time() - start:.1f}s") # 6+ секунд! Неэффективно
Асинхронный код (неблокирующий) с asyncio:
import asyncio
import aiohttp # асинхронная библиотека для HTTP
import time
async def fetch_data(session, url):
"""Асинхронная функция для получения данных"""
async with session.get(url) as response:
return await response.status_code # ждём ответ
async def main():
start = time.time()
# Создаем асинхронную сессию
async with aiohttp.ClientSession() as session:
# Запускаем все 3 запроса ОДНОВРЕМЕННО
tasks = [
fetch_data(session, 'https://api.example.com/data1'),
fetch_data(session, 'https://api.example.com/data2'),
fetch_data(session, 'https://api.example.com/data3'),
]
# Ждём, пока ВСЕ завершат (параллельно)
results = await asyncio.gather(*tasks)
print(f"Results: {results}") # [200, 200, 200]
print(f"Total time: {time.time() - start:.1f}s") # ~2 секунды! Эффективно
# Запускаем асинхронную функцию
asyncio.run(main())
Сравнение подходов:
| Подход | Время для 3 запросов | Использование памяти | Сложность |
|---|---|---|---|
| Синхронный | 6 секунд | Низкое | Простой |
| Потоки (Threading) | 2 секунды | Среднее | Сложный (race conditions) |
| Процессы (Multiprocessing) | 2 секунды | Высокое | Очень сложный (IPC) |
| Asyncio | 2 секунды | Низкое | Средне (await/async) |
Event Loop: сердце asyncio
Как работает asyncio:
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2) # Асинхронная пауза
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
async def main():
# Event loop выполняет задачи в "круговую"
# Когда task1 ждёт (await), event loop переключается на task2
await asyncio.gather(task1(), task2())
asyncio.run(main())
# Вывод:
# Task 1 started
# Task 2 started
# Task 2 finished (через 1 сек)
# Task 1 finished (через 2 сек)
# Всего: 2 секунды, а не 3!
Диаграмма event loop:
Время Event Loop Task 1 Task 2
0ms ┌─ Запустить task1 ──→ начало
├─ Запустить task2 ──────────────────→ начало
│
1000ms ├─ Task2 ждёт sleep ← переключение ← await sleep(1)
│ (готова)
└─ Task2 завершится
│
2000ms └─ Task1 ждёт sleep ← переключение ← await sleep(2)
(готова)
Task1 завершится
Применение asyncio в Data Engineering
1. Загрузка данных из множества API
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_user_data(session, user_id: int) -> Dict:
"""Загружает данные одного пользователя"""
async with session.get(f'https://api.example.com/users/{user_id}') as resp:
return await resp.json()
async def load_all_users(user_ids: List[int]) -> List[Dict]:
"""Загружает данные для всех пользователей параллельно"""
async with aiohttp.ClientSession() as session:
# Вместо 10000 последовательных запросов (2+ часа)
# Делаем 1000 параллельных батчей по 10 запросов (~5 минут)
tasks = [fetch_user_data(session, uid) for uid in user_ids]
# Выполняем батчами для экономии памяти
batch_size = 100
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i+batch_size]
batch_results = await asyncio.gather(*batch)
results.extend(batch_results)
return results
# Использование
user_ids = list(range(1, 10001)) # 10K пользователей
data = asyncio.run(load_all_users(user_ids))
2. Параллельная обработка данных
import asyncio
import pandas as pd
async def process_file(file_path: str) -> pd.DataFrame:
"""Обрабатывает один файл"""
# Имитируем долгую обработку
await asyncio.sleep(1)
df = pd.read_csv(file_path)
return df.groupby('category').size()
async def process_all_files(file_paths: List[str]) -> Dict:
"""Обрабатывает все файлы параллельно"""
tasks = [process_file(fp) for fp in file_paths]
results = await asyncio.gather(*tasks)
# Объединяем результаты
combined = pd.concat(results, axis=1).fillna(0).sum(axis=1)
return combined.to_dict()
# Использование
files = [f'data_2024_{i:02d}.csv' for i in range(1, 13)] # 12 месяцев
results = asyncio.run(process_all_files(files))
3. Мониторинг и обновление БД
import asyncio
import asyncpg # асинхронный драйвер PostgreSQL
async def update_user_status(pool, user_id: int, new_status: str):
"""Обновляет статус пользователя"""
async with pool.acquire() as connection:
await connection.execute(
'UPDATE users SET status = $1 WHERE id = $2',
new_status, user_id
)
async def batch_update_users(dsn: str, updates: Dict[int, str]):
"""Обновляет статусы для множества пользователей"""
# Создаём пул подключений
pool = await asyncpg.create_pool(dsn, min_size=5, max_size=20)
try:
# Обновляем все пользователей параллельно
tasks = [
update_user_status(pool, user_id, status)
for user_id, status in updates.items()
]
await asyncio.gather(*tasks)
finally:
await pool.close()
# Использование
updates = {1: 'active', 2: 'inactive', 3: 'active', ...} # 1M обновлений
await batch_update_users('postgresql://user:pass@localhost/db', updates)
4. Streaming данных из Kafka в реальном времени
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def process_kafka_messages():
"""Читает сообщения из Kafka и обрабатывает их"""
consumer = AIOKafkaConsumer(
'input-topic',
bootstrap_servers='localhost:9092',
group_id='my-group'
)
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092'
)
await consumer.start()
await producer.start()
try:
async for message in consumer:
# Обрабатываем сообщение
data = message.value.decode('utf-8')
result = process_event(data) # быстрая обработка
# Отправляем результат в другой топик
await producer.send('output-topic', result.encode('utf-8'))
finally:
await consumer.stop()
await producer.stop()
# Обработка 1M сообщений в реальном времени
await process_kafka_messages()
5. Оркестрация ETL процессов
import asyncio
import time
async def extract_data(source: str):
"""Extract фаза"""
print(f"Extracting from {source}...")
await asyncio.sleep(2) # имитируем I/O
return {"raw_data": f"from {source}"}
async def transform_data(data):
"""Transform фаза"""
print("Transforming...")
await asyncio.sleep(1)
return {"transformed": data}
async def load_data(data, destination: str):
"""Load фаза"""
print(f"Loading to {destination}...")
await asyncio.sleep(1)
return f"Loaded {len(data)} records"
async def etl_pipeline():
"""Полный ETL pipeline с параллельным извлечением"""
# Извлекаем из 3 источников параллельно
start = time.time()
extracted_data = await asyncio.gather(
extract_data('api_1'),
extract_data('api_2'),
extract_data('api_3'),
)
# Трансформируем
transformed = await transform_data(extracted_data)
# Загружаем
result = await load_data(transformed, 'warehouse')
print(f"Total time: {time.time() - start:.1f}s")
# Было бы 6 сек последовательно, теперь 4 сек
await etl_pipeline()
Asyncio vs Threading vs Multiprocessing
import asyncio
import threading
import multiprocessing
import time
# Asyncio: идеален для I/O
async def io_task():
await asyncio.sleep(1) # I/O операция
# Threading: для I/O с ограничениями GIL
def io_task_thread():
time.sleep(1) # I/O операция
# Multiprocessing: для CPU-bound работы
def cpu_task():
total = sum(range(100_000_000)) # CPU work
return total
# Выбор инструмента:
if io_bound and io_heavy: # 1000+ запросов к API
use_asyncio() # ✅ Asyncio
elif io_bound and thread_blocking: # БД с GIL issues
use_threading() # ✅ Threading
elif cpu_bound: # ML обработка, крупные вычисления
use_multiprocessing() # ✅ Multiprocessing
Практические советы
1. Всегда используй asyncio.gather для параллелизма
# ❌ Плохо: последовательное выполнение
for user_id in user_ids:
await fetch_user(user_id)
# ✅ Хорошо: параллельное выполнение
tasks = [fetch_user(uid) for uid in user_ids]
await asyncio.gather(*tasks)
2. Используй asyncio.Semaphore для ограничения параллелизма
async def limited_fetch(semaphore, session, url):
async with semaphore: # Только N одновременных запросов
return await fetch_data(session, url)
semaphore = asyncio.Semaphore(100) # Макс 100 одновременных
tasks = [
limited_fetch(semaphore, session, url)
for url in urls
]
results = await asyncio.gather(*tasks)
3. Обрабатывай исключения корректно
try:
results = await asyncio.gather(
*tasks,
return_exceptions=True # Не прерывать на первой ошибке
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
except Exception as e:
print(f"Critical error: {e}")
Вывод
Asyncio используется в Data Engineering для:
- Параллельной загрузки данных из множества API (~10x ускорение)
- Параллельной обработки файлов
- Параллельного обновления БД
- Streaming приложений (Kafka, WebSocket)
- Оркестрации ETL процессов
Главное преимущество: низкие затраты на память и CPU по сравнению с threading/multiprocessing при обработке I/O операций.
Когда НЕ использовать: для CPU-bound операций (ML, расчёты) — нужны потоки или процессы.