← Назад к вопросам

Какие знаешь механизмы асинхронности?

2.2 Middle🔥 171 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Механизмы асинхронности в Python

Асинхронность — это один из способов повысить производительность приложения, позволяя выполнять несколько операций параллельно. Существует несколько механизмов реализации асинхронности, каждый с собственными особенностями.

1. Threading (многопоточность)

Выполняй несколько функций одновременно в разных потоках.

import threading
import time

def worker(name, delay):
    """Функция, которая выполняется в отдельном потоке"""
    print(f"{name} started")
    time.sleep(delay)
    print(f"{name} finished")

# Создай потоки
thread1 = threading.Thread(target=worker, args=("Thread-1", 2))
thread2 = threading.Thread(target=worker, args=("Thread-2", 3))
thread3 = threading.Thread(target=worker, args=("Thread-3", 1))

# Запусти потоки
thread1.start()
thread2.start()
thread3.start()

# Жди завершения
thread1.join()  # Жди до завершения
thread2.join()
thread3.join()

print("All threads completed")
# Результат: все потоки выполняются параллельно
# Total time: 3 секунды (max из всех), не 6!

Проблемы Threading:

  • GIL (Global Interpreter Lock) — только один поток может выполнять Python код
  • Не подходит для CPU-bound операций — GIL блокирует все равно
  • Context switching overhead — создание потоков дорого
  • Race conditions — нужны locks для синхронизации
import threading

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.lock = threading.Lock()  # Мьютекс для защиты
    
    def withdraw(self, amount):
        with self.lock:  # Критическая секция
            if self.balance >= amount:
                self.balance -= amount
                return True
        return False

account = BankAccount(1000)

def transfer_thread():
    for _ in range(100):
        account.withdraw(1)

threads = [threading.Thread(target=transfer_thread) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Final balance: {account.balance}")  # 0 (если lock сработал)

Когда использовать: I/O-bound операции (сеть, файлы), коли GIL не критичен.

2. Multiprocessing (многопроцессность)

Запусти несколько процессов, каждый с собственным GIL.

import multiprocessing
import time

def compute_heavy(n):
    """CPU-bound операция"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

if __name__ == '__main__':  # Важно!
    # Последовательно (медленно)
    start = time.time()
    r1 = compute_heavy(50000000)
    r2 = compute_heavy(50000000)
    print(f"Sequential: {time.time() - start:.2f}s")
    
    # Параллельно (быстро)
    start = time.time()
    with multiprocessing.Pool(2) as pool:
        results = pool.map(compute_heavy, [50000000, 50000000])
    print(f"Parallel: {time.time() - start:.2f}s")
    # На двухъядерной системе примерно в 2 раза быстрее!

Плюсы Multiprocessing:

  • Обходит GIL — каждый процесс свой GIL
  • Идеален для CPU-bound операций
  • Истинная параллелизм

Минусы:

  • Дорого запускать новый процесс
  • Inter-process communication медленный
  • Требует pickling данных
import multiprocessing
from multiprocessing import Queue

def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f"Produced: {i}")

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:  # Сигнал завершения
            break
        print(f"Consumed: {item}")

if __name__ == '__main__':
    queue = Queue()
    
    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))
    
    p1.start()
    p2.start()
    
    p1.join()
    queue.put(None)  # Сигнал завершения
    p2.join()

Когда использовать: тяжёлые вычисления, обработка больших данных.

3. Async/Await — асинхронное программирование

Небольшие потоки (coroutines) управляются эффективнее.

import asyncio

async def fetch_data(url):
    """Асинхронная функция (coroutine)"""
    print(f"Fetching {url}")
    await asyncio.sleep(2)  # Имитация сетевого запроса
    print(f"Completed {url}")
    return f"Data from {url}"

async def main():
    # Выполни 3 корутины одновременно
    results = await asyncio.gather(
        fetch_data("http://example.com/1"),
        fetch_data("http://example.com/2"),
        fetch_data("http://example.com/3"),
    )
    return results

# Запусти event loop
results = asyncio.run(main())
print(f"Results: {results}")
# Time: 2 секунды (не 6!)

Как это работает:

  1. asyncio.run() создаёт event loop
  2. Event loop выполняет корутины
  3. Когда корутина встречает await, она сдаёт управление
  4. Event loop выполняет другую корутину
  5. Когда первая корутина готова, event loop возвращается к ней
import asyncio
import aiohttp

async def fetch_json(url):
    """Асинхронный HTTP запрос"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def main():
    urls = [
        'https://api.github.com/users/github',
        'https://api.github.com/users/google',
        'https://api.github.com/users/microsoft',
    ]
    
    # Выполни все запросы параллельно
    results = await asyncio.gather(
        *[fetch_json(url) for url in urls]
    )
    
    for result in results:
        print(f"{result['login']}: {result['public_repos']} repos")

asyncio.run(main())
# Все 3 запроса выполняются одновременно!

Плюсы Async/Await:

  • Очень эффективно для I/O операций
  • Мало overhead (coroutines легче чем потоки)
  • Простой синтаксис с async/await
  • Масштабируется до тысяч одновременных операций

Минусы:

  • Не помогает для CPU-bound операций
  • Требует async-compatible библиотеки
  • Может быть сложным для новичков

4. Event Loop и Callbacks

import asyncio

def callback_version():
    """Старый стиль: callbacks (до async/await)"""
    
    def on_data_received(data):
        print(f"Data: {data}")
    
    def on_error(error):
        print(f"Error: {error}")
    
    loop = asyncio.new_event_loop()
    
    # Зарегистрируй callback
    loop.call_later(1, lambda: on_data_received("Hello"))
    loop.call_later(2, lambda: on_data_received("World"))
    
    # Запусти loop
    loop.run_until_complete(asyncio.sleep(3))
    loop.close()

callback_version()

# Но async/await гораздо лучше!
async def async_version():
    """Новый стиль: async/await"""
    await asyncio.sleep(1)
    print("After 1 second")
    await asyncio.sleep(2)
    print("After 2 more seconds")

asyncio.run(async_version())

5. Asyncio в FastAPI/Starlette

from fastapi import FastAPI
import asyncio
import aiohttp

app = FastAPI()

@app.get("/")
async def index():
    """Обработчик асинхронно"""
    result = await asyncio.sleep(1)  # Не блокирует поток!
    return {"message": "Hello"}

@app.get("/api/data")
async def get_data():
    """Асинхронная операция с БД"""
    # Этот запрос не блокирует другие запросы
    data = await db.fetch("SELECT * FROM users")
    return data

@app.get("/proxy/{url}")
async def proxy_request(url: str):
    """Прокси запрос через несколько сервисов"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# FastAPI автоматически управляет event loop
# Один процесс может обрабатывать тысячи одновременных запросов!

Сравнение механизмов

МеханизмДля I/OДля CPUOverheadМасштабируемостьСложность
ThreadingХорошоПлохо (GIL)СреднийДо сотенСредняя
MultiprocessingПлохоОтличноВысокийДо сотенВысокая
Async/AwaitОтличноПлохоОчень низкийДо миллионовСредняя
CallbacksХорошоПлохоНизкийТысячиВысокая

Практический пример: обработка данных

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# I/O-bound операция
async def fetch_from_api(endpoint):
    # Имитация сетевого запроса
    await asyncio.sleep(1)
    return f"Data from {endpoint}"

# CPU-bound операция
def heavy_calculation(n):
    result = 0
    for i in range(n):
        result += i ** 2
    return result

async def process_with_async():
    """Для I/O операций используй async/await"""
    start = time.time()
    results = await asyncio.gather(
        fetch_from_api("/users"),
        fetch_from_api("/posts"),
        fetch_from_api("/comments"),
    )
    print(f"Async: {time.time() - start:.2f}s")
    return results

def process_with_threads():
    """Для I/O операций также можно использовать threading"""
    start = time.time()
    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(fetch_data, ["/users", "/posts", "/comments"]))
    print(f"Threading: {time.time() - start:.2f}s")
    return results

async def process_with_cpu_bound():
    """Для CPU операций используй multiprocessing"""
    loop = asyncio.get_event_loop()
    
    # Запусти blocking операцию в executor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        result = await loop.run_in_executor(
            executor,
            heavy_calculation,
            100000000
        )
    return result

# asyncio.run(process_with_async())  # ~1 сек (параллельно)
# process_with_threads()  # ~1 сек (параллельно)
# asyncio.run(process_with_cpu_bound())  # ~2 сек (параллельно по CPU)

Best Practices

  1. I/O операции → async/await — наиболее эффективно
  2. CPU операции → multiprocessing — обходит GIL
  3. Комбинируй: async + ThreadPoolExecutor для блокирующего I/O
  4. Избегай: смешивать blocking и non-blocking код
  5. Мониторь: задержки и timeouts в async коде
  6. Тестируй: asyncio код с pytest-asyncio
  7. Используй: context managers (async with) для ресурсов

Выбор механизма

  • FastAPI + requests к БД → async/await
  • Веб-скрепинг множества сайтов → async/await + aiohttp
  • Обработка больших данных → multiprocessing
  • Простой сервер с частыми запросами → async/await
  • Тяжёлые вычисления → multiprocessing
  • Legacy код с блокирующими операциями → threading + asyncio