← Назад к вопросам
Зачем нужен семафор в Python?
2.2 Middle🔥 91 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Зачем нужен семафор в Python?
Семафор — это примитив синхронизации, который ограничивает количество потоков (или асинхронных задач), имеющих одновременный доступ к определённому ресурсу. Это критически важный инструмент для контроля параллелизма.
Основная идея
Семафор хранит внутренний счётчик:
- acquire() — уменьшает счётчик (блокирует, если счётчик = 0)
- release() — увеличивает счётчик
- Когда счётчик = 0, новые потоки ждут
1. Проблема без семафора: ресурсное истощение
import threading
import time
import requests
# ❌ БЕЗ семафора — все 1000 потоков одновременно делают запросы
def make_request_without_limit():
urls = [f"https://api.example.com/endpoint?id={i}" for i in range(1000)]
threads = []
for url in urls:
t = threading.Thread(target=requests.get, args=(url,))
threads.append(t)
t.start() # 1000 потоков начали одновременно!
for t in threads:
t.join()
# Проблемы:
# 1. Переполнение соединений (слишком много сокетов)
# 2. Истощение памяти
# 3. Падение сервера
# 4. Too many open files ошибка
# ✅ С семафором — максимум 5 одновременных потоков
def make_request_with_limit():
semaphore = threading.Semaphore(5)
urls = [f"https://api.example.com/endpoint?id={i}" for i in range(1000)]
def fetch(url):
with semaphore:
requests.get(url)
threads = []
for url in urls:
t = threading.Thread(target=fetch, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
2. Ограничение подключений к БД
Одно из самых частых применений — пул соединений:
import sqlite3
import threading
from threading import Semaphore
class DatabaseConnectionPool:
def __init__(self, db_path: str, max_connections: int = 5):
self.db_path = db_path
self.semaphore = Semaphore(max_connections)
self.local = threading.local()
def get_connection(self):
"""Получить соединение с БД"""
# Ждём, если уже max_connections потоков работают
self.semaphore.acquire()
try:
# Создаём соединение для текущего потока
conn = sqlite3.connect(self.db_path)
return conn
except Exception:
self.semaphore.release() # Освобождаем, если ошибка
raise
def release_connection(self, conn):
"""Вернуть соединение"""
conn.close()
self.semaphore.release()
# Использование
pool = DatabaseConnectionPool(':memory:', max_connections=3)
def database_worker(worker_id: int):
conn = pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute('SELECT ?', (f"Worker {worker_id}",))
print(f"Worker {worker_id}: {cursor.fetchone()}")
time.sleep(1) # Имитация работы
finally:
pool.release_connection(conn)
# Запускаем 10 рабочих, но одновременно работают только 3
threads = []
for i in range(10):
t = threading.Thread(target=database_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
3. API рейт-лимитинг
import requests
from threading import Semaphore
from datetime import datetime, timedelta
class RateLimitedAPIClient:
def __init__(self, max_concurrent_requests: int = 10):
self.semaphore = Semaphore(max_concurrent_requests)
self.api_key = "your-api-key"
def fetch_data(self, endpoint: str):
"""Получить данные с соблюдением rate limit"""
with self.semaphore: # Context manager для гарантированного release
print(f"[{datetime.now()}] Начинаю запрос: {endpoint}")
try:
response = requests.get(
f"https://api.example.com{endpoint}",
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=10
)
return response.json()
except requests.RequestException as e:
print(f"Ошибка: {e}")
return None
finally:
print(f"[{datetime.now()}] Завершил запрос: {endpoint}")
# Использование
client = RateLimitedAPIClient(max_concurrent_requests=5)
endpoints = [
"/users",
"/posts",
"/comments",
"/photos",
] * 5 # 20 запросов
threads = []
for endpoint in endpoints:
t = threading.Thread(target=client.fetch_data, args=(endpoint,))
threads.append(t)
t.start()
for t in threads:
t.join()
4. Ограничение CPU-bound задач
from threading import Semaphore
import hashlib
class CPULimitedExecutor:
"""Ограничить количество CPU-intensive задач"""
def __init__(self, max_workers: int = 4):
self.semaphore = Semaphore(max_workers)
def compute_hash(self, data: bytes) -> str:
with self.semaphore:
print(f"Вычисляю хеш... (активных: {self.semaphore._value})")
# CPU-intensive операция
hash_value = hashlib.sha256(data).hexdigest()
return hash_value
# Использование
executor = CPULimitedExecutor(max_workers=2)
data_list = [f"data_{i}".encode() for i in range(8)]
threads = []
for i, data in enumerate(data_list):
t = threading.Thread(
target=executor.compute_hash,
args=(data,),
name=f"Worker-{i}"
)
threads.append(t)
t.start()
for t in threads:
t.join()
5. Producer-Consumer с семафором
from threading import Semaphore, Lock
import time
from queue import Queue
class BoundedBuffer:
"""Буфер ограниченного размера с синхронизацией"""
def __init__(self, capacity: int = 5):
self.buffer = Queue(maxsize=capacity)
# empty семафор: сколько пустых слотов
self.empty = Semaphore(capacity)
# full семафор: сколько заполненных слотов
self.full = Semaphore(0)
self.mutex = Lock()
def produce(self, item):
"""Добавить элемент"""
self.empty.acquire() # Ждём пустого слота
with self.mutex:
self.buffer.put(item)
print(f"Произвёл: {item} (в буфере: {self.buffer.qsize()})")
self.full.release() # Сигнализируем о новом элементе
def consume(self):
"""Получить элемент"""
self.full.acquire() # Ждём полного слота
with self.mutex:
item = self.buffer.get()
print(f"Потребил: {item} (в буфере: {self.buffer.qsize()})")
self.empty.release() # Сигнализируем об пустом слоте
return item
# Использование
buffer = BoundedBuffer(capacity=3)
def producer():
for i in range(5):
buffer.produce(f"item-{i}")
time.sleep(0.5)
def consumer():
for i in range(5):
buffer.consume()
time.sleep(0.8)
t1 = threading.Thread(target=producer, name="Producer")
t2 = threading.Thread(target=consumer, name="Consumer")
t1.start()
t2.start()
t1.join()
t2.join()
6. asyncio.Semaphore для асинхронного кода
import asyncio
class AsyncAPIClient:
"""Асинхронный клиент с ограничением одновременных запросов"""
def __init__(self, max_concurrent: int = 3):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch(self, url: str):
async with self.semaphore:
print(f"Начинаю запрос: {url}")
# Имитация асинхронного запроса
await asyncio.sleep(2)
print(f"Завершил запрос: {url}")
return f"Данные с {url}"
async def fetch_all(self, urls: list[str]):
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
# Использование
async def main():
client = AsyncAPIClient(max_concurrent=3)
urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3",
"https://api.example.com/4",
"https://api.example.com/5",
]
results = await client.fetch_all(urls)
print(results)
asyncio.run(main())
7. BoundedSemaphore vs Semaphore
# Semaphore — счётчик может расти бесконечно
sem = threading.Semaphore(1)
sem.release() # OK
sem.release() # OK
sem.release() # Счётчик = 3 (опасно!)
# BoundedSemaphore — ограничен начальным значением
bounded = threading.BoundedSemaphore(1)
bounded.release() # OK
bounded.release() # ValueError: Semaphore released too many times
# ✅ Рекомендуется использовать BoundedSemaphore
8. Обработка исключений
from threading import Semaphore
import logging
logger = logging.getLogger(__name__)
class SafeSemaphoreWrapper:
"""Семафор с безопасной обработкой ошибок"""
def __init__(self, max_workers: int = 5):
self.semaphore = BoundedSemaphore(max_workers)
def execute_task(self, task_func, *args, **kwargs):
"""Выполнить задачу с гарантированным освобождением семафора"""
self.semaphore.acquire()
try:
return task_func(*args, **kwargs)
except Exception as e:
logger.error(f"Ошибка при выполнении задачи: {e}")
raise
finally:
self.semaphore.release() # ВСЕГДА освобождаем
Практические рекомендации
- Используй context managers для гарантированного release
- Выбирай правильный размер (обычно = количество ресурсов)
- Мониторь блокировки (потенциальные deadlock)
- Комбинируй с Lock для защиты данных
- Используй asyncio.Semaphore для async кода
Выводы
Семафор нужен для:
- Ограничения параллелизма (max connections)
- Защиты ресурсов (БД, API, файлы)
- Rate limiting (соблюдение квот)
- Предотвращения истощения памяти (слишком много потоков)
- Producer-Consumer паттернов (синхронизация)
Это один из самых важных инструментов многопоточного программирования.