Как работает Event Pool?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Event Pool (Event Loop и пулы)
Event Pool — это механизм для управления асинхронными операциями и масштабируемой обработкой множества событий одновременно. Расскажу о двух основных концепциях: Event Loop и Connection Pool.
1. Event Loop (цикл событий)
Event Loop — это основа асинхронного программирования в Python (asyncio). Это сердце асинхронной программы, которое управляет выполнением корутин.
Как работает:
import asyncio
async def task1():
print("Task 1 начало")
await asyncio.sleep(2)
print("Task 1 конец")
async def task2():
print("Task 2 начало")
await asyncio.sleep(1)
print("Task 2 конец")
async def main():
# Event Loop запускает обе задачи одновременно
# Пока task1 ждет (await sleep), task2 выполняется
await asyncio.gather(task1(), task2())
# Event Loop берет на себя управление
asyncio.run(main())
# Вывод:
# Task 1 начало
# Task 2 начало
# Task 2 конец (через 1 сек)
# Task 1 конец (через 2 сек)
# Всего время: ~2 сек, а не 3
Процесс работы Event Loop:
- Получает события (I/O операции, таймеры, сигналы)
- Добавляет готовые корутины в очередь
- Выполняет корутину до первого await
- Переключается на другую корутину
- Ждет completion события
- Возобновляет выполнение
Выбор Event Loop:
# Получить текущий event loop
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop() # только внутри async функции
# Создать новый event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# На Windows использовать ProactorEventLoop
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
2. Connection Pool (пул соединений)
Connection Pool — это кеш готовых подключений к БД для избежания дорогих операций создания нового подключения.
Проблема без пула:
import psycopg2
# Каждый запрос создает новое подключение — МЕДЛЕННО
for i in range(1000):
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
conn.close()
# Создание + закрытие занимает ~10ms каждый раз
Решение с пулом:
from psycopg_pool import ConnectionPool
# Создаем пул: min_size=5, max_size=20
with ConnectionPool(
"postgresql://postgres@localhost/mydb",
min_size=5,
max_size=20
) as pool:
# Соединения переиспользуются
for i in range(1000):
with pool.connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# Соединение возвращается в пул, не закрывается
3. Thread Pool (пул потоков)
Thread Pool управляет рабочими потоками для выполнения синхронного кода параллельно.
from concurrent.futures import ThreadPoolExecutor
import time
def cpu_bound_task(n):
"""Синхронная функция (медленная)"""
total = 0
for i in range(n):
total += i
return total
# Без пула — последовательно
start = time.time()
for i in range(4):
cpu_bound_task(10**8)
print(f"Без пула: {time.time() - start:.2f}s")
# С пулом — параллельно
with ThreadPoolExecutor(max_workers=4) as executor:
start = time.time()
tasks = [executor.submit(cpu_bound_task, 10**8) for _ in range(4)]
results = [task.result() for task in tasks]
print(f"С пулом: {time.time() - start:.2f}s")
4. Process Pool (пул процессов)
Для CPU-bound операций используем отдельные процессы (обходим GIL):
from concurrent.futures import ProcessPoolExecutor
def heavy_computation(n):
return sum(i**2 for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(heavy_computation, [10**7] * 4)
print(list(results))
5. Async + Connection Pool (лучшая практика)
import asyncio
import asyncpg
class DatabasePool:
def __init__(self, dsn, min_size=5, max_size=20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def init(self):
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size
)
async def fetch_users(self):
async with self.pool.acquire() as conn:
return await conn.fetch('SELECT * FROM users')
async def close(self):
await self.pool.close()
# Использование
async def main():
db = DatabasePool('postgresql://localhost/mydb')
await db.init()
# Одновременно выполняем 100 запросов
tasks = [db.fetch_users() for _ in range(100)]
results = await asyncio.gather(*tasks)
await db.close()
asyncio.run(main())
6. Мониторинг пула
async def monitor_pool(pool):
"""Отслеживаем состояние пула"""
while True:
print(f"Pool size: {pool.get_size()}")
print(f"Free connections: {pool.get_idle_size()}")
print(f"Waiting tasks: {pool._holders.__len__()}")
await asyncio.sleep(5)
Итоговая схема
Event Loop управляет асинхронными задачами, Connection Pool переиспользует БД соединения, Thread/Process Pool параллелит синхронный код. Вместе они обеспечивают высокую производительность веб-приложения.
Примеры использования:
- FastAPI с asyncpg пулом → обработка 10k RPS
- Scrapy с пулом потоков → параллельный парсинг
- Celery с пулом процессов → распределённые задачи