← Назад к вопросам

Есть ли Semaphore в Python?

2.2 Middle🔥 141 комментариев
#Python Core#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Есть ли Semaphore в Python?

Да, в Python есть встроенный модуль threading.Semaphore для работы с семафорами. Это примитив синхронизации, который используется для ограничения количества потоков, имеющих одновременный доступ к ресурсу.

Основная концепция

Семафор — это счётчик, который можно увеличивать (release) и уменьшать (acquire). Когда счётчик = 0, попытка получить семафор блокирует поток.

1. Использование Semaphore

import threading
import time

# Ограничиваем одновременный доступ двумя потоками
semaphore = threading.Semaphore(2)

def worker(worker_id):
    with semaphore:
        print(f"Worker {worker_id} начал работу")
        time.sleep(2)
        print(f"Worker {worker_id} закончил")

# Создаём 5 потоков, но одновременно работают только 2
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Вывод:

Worker 0 начал работу
Worker 1 начал работу
Worker 0 закончил
Worker 2 начал работу
Worker 1 закончил
Worker 3 начал работу
Worker 2 закончил
Worker 4 начал работу
Worker 3 закончил
Worker 4 закончил

2. Разница между Semaphore и BoundedSemaphore

import threading

# Semaphore — можно увеличивать счётчик бесконечно
sem = threading.Semaphore(1)
sem.release()  # OK
sem.release()  # OK
sem.release()  # Счётчик = 3

# BoundedSemaphore — ограничена начальным значением
bounded_sem = threading.BoundedSemaphore(1)
bounded_sem.release()  # OK
bounded_sem.release()  # ValueError: Semaphore released too many times

3. Практический пример: рейт-лимитинг API запросов

import threading
import time
import requests
from typing import List

class RateLimitedClient:
    def __init__(self, max_concurrent_requests: int = 3):
        self.semaphore = threading.Semaphore(max_concurrent_requests)
        self.request_count = 0
        self.lock = threading.Lock()
    
    def fetch_url(self, url: str) -> str:
        with self.semaphore:  # Ограничиваем количество одновременных запросов
            with self.lock:
                self.request_count += 1
                count = self.request_count
            
            print(f"[{count}] Начинаю запрос: {url}")
            try:
                response = requests.get(url, timeout=5)
                return response.text[:100]  # Первые 100 символов
            except Exception as e:
                return f"Ошибка: {e}"
            finally:
                print(f"[{count}] Завершил запрос: {url}")

# Использование
client = RateLimitedClient(max_concurrent_requests=2)
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

threads = []
for url in urls:
    t = threading.Thread(target=client.fetch_url, args=(url,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

4. Семафор в asyncio

Для асинхронного кода есть asyncio.Semaphore:

import asyncio

async def async_worker(semaphore, worker_id):
    async with semaphore:
        print(f"Worker {worker_id} начал работу")
        await asyncio.sleep(2)
        print(f"Worker {worker_id} закончил")

async def main():
    # Ограничиваем 3 одновременными задачами
    semaphore = asyncio.Semaphore(3)
    
    tasks = [async_worker(semaphore, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

5. Практический пример: пул соединений к БД

import threading
import sqlite3
from contextlib import contextmanager
from typing import Iterator

class ConnectionPool:
    def __init__(self, db_path: str, pool_size: int = 5):
        self.db_path = db_path
        self.semaphore = threading.Semaphore(pool_size)
        self.local = threading.local()
    
    @contextmanager
    def get_connection(self) -> Iterator[sqlite3.Connection]:
        """Получить соединение из пула"""
        self.semaphore.acquire()
        try:
            # Создаём соединение для текущего потока если не существует
            if not hasattr(self.local, 'connection'):
                self.local.connection = sqlite3.connect(self.db_path)
            
            yield self.local.connection
        finally:
            self.semaphore.release()

# Использование
pool = ConnectionPool(':memory:', pool_size=3)

def query_worker(worker_id):
    with pool.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('SELECT ?', (f"Worker {worker_id}",))
        print(f"Worker {worker_id}: {cursor.fetchone()}")

threads = []
for i in range(10):
    t = threading.Thread(target=query_worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

6. Counting Semaphore для синхронизации событий

import threading
import time

class ProducerConsumer:
    def __init__(self):
        # empty семафор: сколько пустых мест в буфере
        self.empty = threading.Semaphore(5)  # Буфер на 5 элементов
        # full семафор: сколько заполненных элементов
        self.full = threading.Semaphore(0)
        # Мьютекс для защиты буфера
        self.mutex = threading.Lock()
        self.buffer = []
    
    def produce(self, item):
        self.empty.acquire()  # Ждём пустого места
        
        with self.mutex:
            self.buffer.append(item)
            print(f"Произвёл: {item}, буфер: {self.buffer}")
        
        self.full.release()  # Сигнализируем о новом элементе
    
    def consume(self):
        self.full.acquire()  # Ждём полного элемента
        
        with self.mutex:
            item = self.buffer.pop(0)
            print(f"Потребил: {item}, буфер: {self.buffer}")
        
        self.empty.release()  # Сигнализируем об пустом месте
        return item

# Использование
pc = ProducerConsumer()

def producer():
    for i in range(5):
        pc.produce(f"item-{i}")
        time.sleep(0.5)

def consumer():
    for i in range(5):
        pc.consume()
        time.sleep(0.8)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

t1.join()
t2.join()

7. Сравнение примитивов синхронизации

ПримитивНазначениеИспользование
SemaphoreОграничить количество потоковРейт-лимитинг, пул ресурсов
Lock/MutexИсключительный доступ (1 поток)Защита критических секций
RLockРекурсивная блокировкаКогда тот же поток вызывает сам себя
EventСигнализация событияПоток ждёт события от другого
ConditionУсловная переменнаяСложная синхронизация потоков
BarrierСинхронизация N потоковВсе должны достичь точки

8. Вложенные семафоры

import threading

class ThreadPool:
    def __init__(self, num_workers: int = 4):
        # Семафор для ограничения рабочих потоков
        self.worker_semaphore = threading.Semaphore(num_workers)
        # Семафор для синхронизации очереди задач
        self.task_semaphore = threading.Semaphore(0)
        self.task_queue = []
        self.lock = threading.Lock()
        self.shutdown = False
    
    def submit(self, task):
        with self.lock:
            self.task_queue.append(task)
        self.task_semaphore.release()
    
    def worker(self):
        while not self.shutdown:
            self.task_semaphore.acquire(timeout=1)
            
            with self.lock:
                if not self.task_queue:
                    continue
                task = self.task_queue.pop(0)
            
            try:
                task()
            finally:
                self.worker_semaphore.release()

Анти-паттерны

# ❌ Плохо — может привести к deadlock
sem1 = threading.Semaphore(1)
sem2 = threading.Semaphore(1)

def thread1():
    sem1.acquire()
    sem2.acquire()  # Может заблокироваться

def thread2():
    sem2.acquire()
    sem1.acquire()  # Может заблокироваться

# ✅ Хорошо — всегда захватывать в одном порядке
def thread1():
    sem1.acquire()
    sem2.acquire()

def thread2():
    sem1.acquire()
    sem2.acquire()

Выводы

Семафоры в Python — это мощный инструмент для:

  • Ограничения ресурсов (max connections, threads)
  • Рейт-лимитинга (API запросы)
  • Синхронизации потоков (producer-consumer)
  • Пулов ресурсов (БД соединения, worker threads)

Выбирайте между threading.Semaphore (потоки) и asyncio.Semaphore (асинхронность) в зависимости от архитектуры вашего приложения.

Есть ли Semaphore в Python? | PrepBro