← Назад к вопросам

Для чего используется backpressure?

3.0 Senior🔥 11 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Backpressure: Управление потоком данных

Backpressure (противодавление) — это механизм для управления скоростью передачи данных в системах, где потребитель не может обработать данные с той же скоростью, с которой их производит источник. Это критично для стабильности и эффективности асинхронных систем.

1. Проблема без Backpressure

Неконтролируемое потребление памяти при быстром производстве данных:

import asyncio
import random

# Плохо: без контроля скорости
async def producer_no_backpressure():
    for i in range(1000000):
        # Производимся данные очень быстро
        yield f"item_{i}"
        await asyncio.sleep(0.001)  # Мало работы

async def consumer_slow():
    # Потребитель медленный
    async for item in producer_no_backpressure():
        print(f"Processing: {item}")
        await asyncio.sleep(1)  # Долгая обработка
        
# Результат: буфер заполняется, память исчерпывается

Проблема: производитель создаёт 1000 элементов в секунду, потребитель обрабатывает только 1 элемент в секунду. Буфер растёт до 1000 элементов в памяти!

2. Backpressure через asyncio.Queue

Ограничение размера очереди заставляет производителя ждать:

import asyncio

async def producer_with_backpressure(queue: asyncio.Queue, max_items=100):
    """Производитель, который ждёт если очередь полна"""
    for i in range(1000):
        print(f"Producing item {i}")
        await queue.put(f"item_{i}")  # Блокируется если очередь полна
        await asyncio.sleep(0.01)

async def consumer_slow(queue: asyncio.Queue):
    """Медленный потребитель"""
    while True:
        item = await queue.get()
        print(f"Processing: {item}")
        await asyncio.sleep(1)  # Долгая обработка
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)  # Максимум 10 элементов
    
    await asyncio.gather(
        producer_with_backpressure(queue),
        consumer_slow(queue)
    )

asyncio.run(main())

Результат: Когда очередь заполнится на 10 элементов, производитель остановится и будет ждать, пока потребитель освободит место. Память под контролем.

3. Backpressure в HTTP запросах

Ограничение одновременных запросов к сервису:

import aiohttp
import asyncio
from asyncio import Semaphore

class HttpClientWithBackpressure:
    def __init__(self, max_concurrent_requests=10):
        self.semaphore = Semaphore(max_concurrent_requests)
    
    async def fetch(self, session, url):
        async with self.semaphore:
            async with session.get(url) as response:
                return await response.text()
    
    async def fetch_many(self, urls):
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in urls]
            return await asyncio.gather(*tasks)

# Использование
client = HttpClientWithBackpressure(max_concurrent_requests=5)
urls = [f"https://api.example.com/users/{i}" for i in range(100)]
results = await client.fetch_many(urls)

Результат: Всегда только 5 одновременных запросов, остальные ждут в очереди. Сервер не будет перегружен.

4. Backpressure в потоковой обработке данных

Обработка больших файлов с контролем памяти:

async def read_file_with_backpressure(filepath, chunk_size=1024*1024):
    """Чтение файла с backpressure"""
    queue = asyncio.Queue(maxsize=5)  # Максимум 5 чанков в памяти
    
    async def producer():
        with open(filepath, 'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    await queue.put(None)  # EOF
                    break
                await queue.put(chunk)
                print(f"Queued chunk, size: {queue.qsize()}")
    
    async def consumer():
        total = 0
        while True:
            chunk = await queue.get()
            if chunk is None:
                break
            total += len(chunk)
            print(f"Processing {len(chunk)} bytes, total: {total}")
            await asyncio.sleep(0.5)  # Имитация обработки
            queue.task_done()
        return total
    
    producer_task = asyncio.create_task(producer())
    consumer_task = asyncio.create_task(consumer())
    
    await producer_task
    return await consumer_task

# Использование
result = await read_file_with_backpressure('/path/to/large/file.bin')
print(f"Processed {result} bytes")

5. Backpressure в RabbitMQ / Kafka

Отправка сообщений с ограничением на буфер:

import aio_pika
import asyncio

async def producer_with_backpressure():
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    exchange = await channel.declare_exchange('my_exchange', aio_pika.ExchangeType.DIRECT)
    queue = await channel.declare_queue('my_queue')
    await queue.bind(exchange, 'routing_key')
    
    for i in range(1000):
        message = aio_pika.Message(body=f"Message {i}".encode())
        # Если буфер брокера полон, отправка заблокируется
        await exchange.publish(message, routing_key='routing_key')
        print(f"Sent message {i}")
        await asyncio.sleep(0.01)
    
    await connection.close()

asyncio.run(producer_with_backpressure())

6. Backpressure в Stream обработке (FastAPI)

Отправка больших файлов без загрузки в память:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def generate_file():
    """Генерирует данные по 1MB чанками"""
    chunk_size = 1024 * 1024
    for i in range(100):
        chunk = b"x" * chunk_size
        yield chunk
        await asyncio.sleep(0.1)  # Имитация обработки

@app.get("/large-file")
async def download_large_file():
    return StreamingResponse(generate_file(), media_type="application/octet-stream")

Как это работает: Клиент получает данные по мере их создания. Если клиент медленный, генератор паузируется. Память не переполняется.

7. Backpressure в WebSocket соединениях

Отправка сообщений в WebSocket с контролем скорости:

from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
import asyncio

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        while True:
            # Получаем от клиента сигнал готовности
            message = await websocket.receive_text()
            
            if message == "ready":
                # Отправляем данные только когда клиент готов
                for i in range(1000):
                    await websocket.send_text(f"data_{i}")
                    await asyncio.sleep(0.1)  # Контроль скорости
    except WebSocketDisconnect:
        print("Client disconnected")

8. Практический пример: обработка очереди задач

import asyncio
from typing import Callable

class TaskProcessor:
    def __init__(self, max_concurrent=10, max_queue_size=100):
        self.queue = asyncio.Queue(maxsize=max_queue_size)
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_task(self, task_id, handler: Callable):
        async with self.semaphore:
            print(f"Processing task {task_id}")
            return await handler(task_id)
    
    async def add_task(self, task_id, handler):
        """Блокируется если очередь полна (backpressure)"""
        await self.queue.put((task_id, handler))
        print(f"Task {task_id} queued")
    
    async def run(self):
        while True:
            task_id, handler = await self.queue.get()
            await self.process_task(task_id, handler)
            self.queue.task_done()

# Использование
async def handle_task(task_id):
    await asyncio.sleep(1)
    print(f"Task {task_id} done")

processor = TaskProcessor(max_concurrent=5, max_queue_size=20)

async def producer():
    for i in range(100):
        await processor.add_task(i, handle_task)  # Будет ждать если очередь полна

async def main():
    await asyncio.gather(
        processor.run(),
        producer()
    )

asyncio.run(main())

Ключевые преимущества Backpressure

  • Контроль памяти: Предотвращает неконтролируемый рост памяти
  • Стабильность: Предотвращает падение сервиса из-за перегрузки
  • Справедливость: Все задачи обрабатываются в справедливом порядке
  • Масштабируемость: Система может обрабатывать больше данных без увеличения ресурсов

Заключение

Backpressure — это не опциональное улучшение, а необходимость для высоконагруженных систем. Используй asyncio.Queue с maxsize, Semaphore для ограничения параллелизма и Stream ответы для больших данных.

Для чего используется backpressure? | PrepBro