← Назад к вопросам

Зачем нужен семафор в Python?

2.2 Middle🔥 91 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Зачем нужен семафор в Python?

Семафор — это примитив синхронизации, который ограничивает количество потоков (или асинхронных задач), имеющих одновременный доступ к определённому ресурсу. Это критически важный инструмент для контроля параллелизма.

Основная идея

Семафор хранит внутренний счётчик:

  • acquire() — уменьшает счётчик (блокирует, если счётчик = 0)
  • release() — увеличивает счётчик
  • Когда счётчик = 0, новые потоки ждут

1. Проблема без семафора: ресурсное истощение

import threading
import time
import requests

# ❌ БЕЗ семафора — все 1000 потоков одновременно делают запросы
def make_request_without_limit():
    urls = [f"https://api.example.com/endpoint?id={i}" for i in range(1000)]
    threads = []
    
    for url in urls:
        t = threading.Thread(target=requests.get, args=(url,))
        threads.append(t)
        t.start()  # 1000 потоков начали одновременно!
    
    for t in threads:
        t.join()
    
    # Проблемы:
    # 1. Переполнение соединений (слишком много сокетов)
    # 2. Истощение памяти
    # 3. Падение сервера
    # 4. Too many open files ошибка

# ✅ С семафором — максимум 5 одновременных потоков
def make_request_with_limit():
    semaphore = threading.Semaphore(5)
    urls = [f"https://api.example.com/endpoint?id={i}" for i in range(1000)]
    
    def fetch(url):
        with semaphore:
            requests.get(url)
    
    threads = []
    for url in urls:
        t = threading.Thread(target=fetch, args=(url,))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()

2. Ограничение подключений к БД

Одно из самых частых применений — пул соединений:

import sqlite3
import threading
from threading import Semaphore

class DatabaseConnectionPool:
    def __init__(self, db_path: str, max_connections: int = 5):
        self.db_path = db_path
        self.semaphore = Semaphore(max_connections)
        self.local = threading.local()
    
    def get_connection(self):
        """Получить соединение с БД"""
        # Ждём, если уже max_connections потоков работают
        self.semaphore.acquire()
        
        try:
            # Создаём соединение для текущего потока
            conn = sqlite3.connect(self.db_path)
            return conn
        except Exception:
            self.semaphore.release()  # Освобождаем, если ошибка
            raise
    
    def release_connection(self, conn):
        """Вернуть соединение"""
        conn.close()
        self.semaphore.release()

# Использование
pool = DatabaseConnectionPool(':memory:', max_connections=3)

def database_worker(worker_id: int):
    conn = pool.get_connection()
    try:
        cursor = conn.cursor()
        cursor.execute('SELECT ?', (f"Worker {worker_id}",))
        print(f"Worker {worker_id}: {cursor.fetchone()}")
        time.sleep(1)  # Имитация работы
    finally:
        pool.release_connection(conn)

# Запускаем 10 рабочих, но одновременно работают только 3
threads = []
for i in range(10):
    t = threading.Thread(target=database_worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

3. API рейт-лимитинг

import requests
from threading import Semaphore
from datetime import datetime, timedelta

class RateLimitedAPIClient:
    def __init__(self, max_concurrent_requests: int = 10):
        self.semaphore = Semaphore(max_concurrent_requests)
        self.api_key = "your-api-key"
    
    def fetch_data(self, endpoint: str):
        """Получить данные с соблюдением rate limit"""
        with self.semaphore:  # Context manager для гарантированного release
            print(f"[{datetime.now()}] Начинаю запрос: {endpoint}")
            
            try:
                response = requests.get(
                    f"https://api.example.com{endpoint}",
                    headers={"Authorization": f"Bearer {self.api_key}"},
                    timeout=10
                )
                return response.json()
            except requests.RequestException as e:
                print(f"Ошибка: {e}")
                return None
            finally:
                print(f"[{datetime.now()}] Завершил запрос: {endpoint}")

# Использование
client = RateLimitedAPIClient(max_concurrent_requests=5)
endpoints = [
    "/users",
    "/posts",
    "/comments",
    "/photos",
] * 5  # 20 запросов

threads = []
for endpoint in endpoints:
    t = threading.Thread(target=client.fetch_data, args=(endpoint,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

4. Ограничение CPU-bound задач

from threading import Semaphore
import hashlib

class CPULimitedExecutor:
    """Ограничить количество CPU-intensive задач"""
    
    def __init__(self, max_workers: int = 4):
        self.semaphore = Semaphore(max_workers)
    
    def compute_hash(self, data: bytes) -> str:
        with self.semaphore:
            print(f"Вычисляю хеш... (активных: {self.semaphore._value})")
            # CPU-intensive операция
            hash_value = hashlib.sha256(data).hexdigest()
            return hash_value

# Использование
executor = CPULimitedExecutor(max_workers=2)

data_list = [f"data_{i}".encode() for i in range(8)]
threads = []

for i, data in enumerate(data_list):
    t = threading.Thread(
        target=executor.compute_hash,
        args=(data,),
        name=f"Worker-{i}"
    )
    threads.append(t)
    t.start()

for t in threads:
    t.join()

5. Producer-Consumer с семафором

from threading import Semaphore, Lock
import time
from queue import Queue

class BoundedBuffer:
    """Буфер ограниченного размера с синхронизацией"""
    
    def __init__(self, capacity: int = 5):
        self.buffer = Queue(maxsize=capacity)
        # empty семафор: сколько пустых слотов
        self.empty = Semaphore(capacity)
        # full семафор: сколько заполненных слотов
        self.full = Semaphore(0)
        self.mutex = Lock()
    
    def produce(self, item):
        """Добавить элемент"""
        self.empty.acquire()  # Ждём пустого слота
        
        with self.mutex:
            self.buffer.put(item)
            print(f"Произвёл: {item} (в буфере: {self.buffer.qsize()})")
        
        self.full.release()  # Сигнализируем о новом элементе
    
    def consume(self):
        """Получить элемент"""
        self.full.acquire()  # Ждём полного слота
        
        with self.mutex:
            item = self.buffer.get()
            print(f"Потребил: {item} (в буфере: {self.buffer.qsize()})")
        
        self.empty.release()  # Сигнализируем об пустом слоте
        return item

# Использование
buffer = BoundedBuffer(capacity=3)

def producer():
    for i in range(5):
        buffer.produce(f"item-{i}")
        time.sleep(0.5)

def consumer():
    for i in range(5):
        buffer.consume()
        time.sleep(0.8)

t1 = threading.Thread(target=producer, name="Producer")
t2 = threading.Thread(target=consumer, name="Consumer")

t1.start()
t2.start()

t1.join()
t2.join()

6. asyncio.Semaphore для асинхронного кода

import asyncio

class AsyncAPIClient:
    """Асинхронный клиент с ограничением одновременных запросов"""
    
    def __init__(self, max_concurrent: int = 3):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch(self, url: str):
        async with self.semaphore:
            print(f"Начинаю запрос: {url}")
            # Имитация асинхронного запроса
            await asyncio.sleep(2)
            print(f"Завершил запрос: {url}")
            return f"Данные с {url}"
    
    async def fetch_all(self, urls: list[str]):
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks)

# Использование
async def main():
    client = AsyncAPIClient(max_concurrent=3)
    urls = [
        "https://api.example.com/1",
        "https://api.example.com/2",
        "https://api.example.com/3",
        "https://api.example.com/4",
        "https://api.example.com/5",
    ]
    
    results = await client.fetch_all(urls)
    print(results)

asyncio.run(main())

7. BoundedSemaphore vs Semaphore

# Semaphore — счётчик может расти бесконечно
sem = threading.Semaphore(1)
sem.release()  # OK
sem.release()  # OK
sem.release()  # Счётчик = 3 (опасно!)

# BoundedSemaphore — ограничен начальным значением
bounded = threading.BoundedSemaphore(1)
bounded.release()  # OK
bounded.release()  # ValueError: Semaphore released too many times

# ✅ Рекомендуется использовать BoundedSemaphore

8. Обработка исключений

from threading import Semaphore
import logging

logger = logging.getLogger(__name__)

class SafeSemaphoreWrapper:
    """Семафор с безопасной обработкой ошибок"""
    
    def __init__(self, max_workers: int = 5):
        self.semaphore = BoundedSemaphore(max_workers)
    
    def execute_task(self, task_func, *args, **kwargs):
        """Выполнить задачу с гарантированным освобождением семафора"""
        self.semaphore.acquire()
        try:
            return task_func(*args, **kwargs)
        except Exception as e:
            logger.error(f"Ошибка при выполнении задачи: {e}")
            raise
        finally:
            self.semaphore.release()  # ВСЕГДА освобождаем

Практические рекомендации

  1. Используй context managers для гарантированного release
  2. Выбирай правильный размер (обычно = количество ресурсов)
  3. Мониторь блокировки (потенциальные deadlock)
  4. Комбинируй с Lock для защиты данных
  5. Используй asyncio.Semaphore для async кода

Выводы

Семафор нужен для:

  • Ограничения параллелизма (max connections)
  • Защиты ресурсов (БД, API, файлы)
  • Rate limiting (соблюдение квот)
  • Предотвращения истощения памяти (слишком много потоков)
  • Producer-Consumer паттернов (синхронизация)

Это один из самых важных инструментов многопоточного программирования.