← Назад к вопросам
Какие знаешь механизмы асинхронности?
2.2 Middle🔥 171 комментариев
#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Механизмы асинхронности в Python
Асинхронность — это один из способов повысить производительность приложения, позволяя выполнять несколько операций параллельно. Существует несколько механизмов реализации асинхронности, каждый с собственными особенностями.
1. Threading (многопоточность)
Выполняй несколько функций одновременно в разных потоках.
import threading
import time
def worker(name, delay):
"""Функция, которая выполняется в отдельном потоке"""
print(f"{name} started")
time.sleep(delay)
print(f"{name} finished")
# Создай потоки
thread1 = threading.Thread(target=worker, args=("Thread-1", 2))
thread2 = threading.Thread(target=worker, args=("Thread-2", 3))
thread3 = threading.Thread(target=worker, args=("Thread-3", 1))
# Запусти потоки
thread1.start()
thread2.start()
thread3.start()
# Жди завершения
thread1.join() # Жди до завершения
thread2.join()
thread3.join()
print("All threads completed")
# Результат: все потоки выполняются параллельно
# Total time: 3 секунды (max из всех), не 6!
Проблемы Threading:
- GIL (Global Interpreter Lock) — только один поток может выполнять Python код
- Не подходит для CPU-bound операций — GIL блокирует все равно
- Context switching overhead — создание потоков дорого
- Race conditions — нужны locks для синхронизации
import threading
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.Lock() # Мьютекс для защиты
def withdraw(self, amount):
with self.lock: # Критическая секция
if self.balance >= amount:
self.balance -= amount
return True
return False
account = BankAccount(1000)
def transfer_thread():
for _ in range(100):
account.withdraw(1)
threads = [threading.Thread(target=transfer_thread) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Final balance: {account.balance}") # 0 (если lock сработал)
Когда использовать: I/O-bound операции (сеть, файлы), коли GIL не критичен.
2. Multiprocessing (многопроцессность)
Запусти несколько процессов, каждый с собственным GIL.
import multiprocessing
import time
def compute_heavy(n):
"""CPU-bound операция"""
result = 0
for i in range(n):
result += i ** 2
return result
if __name__ == '__main__': # Важно!
# Последовательно (медленно)
start = time.time()
r1 = compute_heavy(50000000)
r2 = compute_heavy(50000000)
print(f"Sequential: {time.time() - start:.2f}s")
# Параллельно (быстро)
start = time.time()
with multiprocessing.Pool(2) as pool:
results = pool.map(compute_heavy, [50000000, 50000000])
print(f"Parallel: {time.time() - start:.2f}s")
# На двухъядерной системе примерно в 2 раза быстрее!
Плюсы Multiprocessing:
- Обходит GIL — каждый процесс свой GIL
- Идеален для CPU-bound операций
- Истинная параллелизм
Минусы:
- Дорого запускать новый процесс
- Inter-process communication медленный
- Требует pickling данных
import multiprocessing
from multiprocessing import Queue
def producer(queue):
for i in range(5):
queue.put(i)
print(f"Produced: {i}")
def consumer(queue):
while True:
item = queue.get()
if item is None: # Сигнал завершения
break
print(f"Consumed: {item}")
if __name__ == '__main__':
queue = Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
queue.put(None) # Сигнал завершения
p2.join()
Когда использовать: тяжёлые вычисления, обработка больших данных.
3. Async/Await — асинхронное программирование
Небольшие потоки (coroutines) управляются эффективнее.
import asyncio
async def fetch_data(url):
"""Асинхронная функция (coroutine)"""
print(f"Fetching {url}")
await asyncio.sleep(2) # Имитация сетевого запроса
print(f"Completed {url}")
return f"Data from {url}"
async def main():
# Выполни 3 корутины одновременно
results = await asyncio.gather(
fetch_data("http://example.com/1"),
fetch_data("http://example.com/2"),
fetch_data("http://example.com/3"),
)
return results
# Запусти event loop
results = asyncio.run(main())
print(f"Results: {results}")
# Time: 2 секунды (не 6!)
Как это работает:
- asyncio.run() создаёт event loop
- Event loop выполняет корутины
- Когда корутина встречает
await, она сдаёт управление - Event loop выполняет другую корутину
- Когда первая корутина готова, event loop возвращается к ней
import asyncio
import aiohttp
async def fetch_json(url):
"""Асинхронный HTTP запрос"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
'https://api.github.com/users/github',
'https://api.github.com/users/google',
'https://api.github.com/users/microsoft',
]
# Выполни все запросы параллельно
results = await asyncio.gather(
*[fetch_json(url) for url in urls]
)
for result in results:
print(f"{result['login']}: {result['public_repos']} repos")
asyncio.run(main())
# Все 3 запроса выполняются одновременно!
Плюсы Async/Await:
- Очень эффективно для I/O операций
- Мало overhead (coroutines легче чем потоки)
- Простой синтаксис с async/await
- Масштабируется до тысяч одновременных операций
Минусы:
- Не помогает для CPU-bound операций
- Требует async-compatible библиотеки
- Может быть сложным для новичков
4. Event Loop и Callbacks
import asyncio
def callback_version():
"""Старый стиль: callbacks (до async/await)"""
def on_data_received(data):
print(f"Data: {data}")
def on_error(error):
print(f"Error: {error}")
loop = asyncio.new_event_loop()
# Зарегистрируй callback
loop.call_later(1, lambda: on_data_received("Hello"))
loop.call_later(2, lambda: on_data_received("World"))
# Запусти loop
loop.run_until_complete(asyncio.sleep(3))
loop.close()
callback_version()
# Но async/await гораздо лучше!
async def async_version():
"""Новый стиль: async/await"""
await asyncio.sleep(1)
print("After 1 second")
await asyncio.sleep(2)
print("After 2 more seconds")
asyncio.run(async_version())
5. Asyncio в FastAPI/Starlette
from fastapi import FastAPI
import asyncio
import aiohttp
app = FastAPI()
@app.get("/")
async def index():
"""Обработчик асинхронно"""
result = await asyncio.sleep(1) # Не блокирует поток!
return {"message": "Hello"}
@app.get("/api/data")
async def get_data():
"""Асинхронная операция с БД"""
# Этот запрос не блокирует другие запросы
data = await db.fetch("SELECT * FROM users")
return data
@app.get("/proxy/{url}")
async def proxy_request(url: str):
"""Прокси запрос через несколько сервисов"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# FastAPI автоматически управляет event loop
# Один процесс может обрабатывать тысячи одновременных запросов!
Сравнение механизмов
| Механизм | Для I/O | Для CPU | Overhead | Масштабируемость | Сложность |
|---|---|---|---|---|---|
| Threading | Хорошо | Плохо (GIL) | Средний | До сотен | Средняя |
| Multiprocessing | Плохо | Отлично | Высокий | До сотен | Высокая |
| Async/Await | Отлично | Плохо | Очень низкий | До миллионов | Средняя |
| Callbacks | Хорошо | Плохо | Низкий | Тысячи | Высокая |
Практический пример: обработка данных
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# I/O-bound операция
async def fetch_from_api(endpoint):
# Имитация сетевого запроса
await asyncio.sleep(1)
return f"Data from {endpoint}"
# CPU-bound операция
def heavy_calculation(n):
result = 0
for i in range(n):
result += i ** 2
return result
async def process_with_async():
"""Для I/O операций используй async/await"""
start = time.time()
results = await asyncio.gather(
fetch_from_api("/users"),
fetch_from_api("/posts"),
fetch_from_api("/comments"),
)
print(f"Async: {time.time() - start:.2f}s")
return results
def process_with_threads():
"""Для I/O операций также можно использовать threading"""
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_data, ["/users", "/posts", "/comments"]))
print(f"Threading: {time.time() - start:.2f}s")
return results
async def process_with_cpu_bound():
"""Для CPU операций используй multiprocessing"""
loop = asyncio.get_event_loop()
# Запусти blocking операцию в executor
with concurrent.futures.ProcessPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
heavy_calculation,
100000000
)
return result
# asyncio.run(process_with_async()) # ~1 сек (параллельно)
# process_with_threads() # ~1 сек (параллельно)
# asyncio.run(process_with_cpu_bound()) # ~2 сек (параллельно по CPU)
Best Practices
- I/O операции → async/await — наиболее эффективно
- CPU операции → multiprocessing — обходит GIL
- Комбинируй: async + ThreadPoolExecutor для блокирующего I/O
- Избегай: смешивать blocking и non-blocking код
- Мониторь: задержки и timeouts в async коде
- Тестируй: asyncio код с pytest-asyncio
- Используй: context managers (async with) для ресурсов
Выбор механизма
- FastAPI + requests к БД → async/await
- Веб-скрепинг множества сайтов → async/await + aiohttp
- Обработка больших данных → multiprocessing
- Простой сервер с частыми запросами → async/await
- Тяжёлые вычисления → multiprocessing
- Legacy код с блокирующими операциями → threading + asyncio