← Назад к вопросам
Есть ли Semaphore в Python?
2.2 Middle🔥 141 комментариев
#Python Core#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Есть ли Semaphore в Python?
Да, в Python есть встроенный модуль threading.Semaphore для работы с семафорами. Это примитив синхронизации, который используется для ограничения количества потоков, имеющих одновременный доступ к ресурсу.
Основная концепция
Семафор — это счётчик, который можно увеличивать (release) и уменьшать (acquire). Когда счётчик = 0, попытка получить семафор блокирует поток.
1. Использование Semaphore
import threading
import time
# Ограничиваем одновременный доступ двумя потоками
semaphore = threading.Semaphore(2)
def worker(worker_id):
with semaphore:
print(f"Worker {worker_id} начал работу")
time.sleep(2)
print(f"Worker {worker_id} закончил")
# Создаём 5 потоков, но одновременно работают только 2
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Вывод:
Worker 0 начал работу
Worker 1 начал работу
Worker 0 закончил
Worker 2 начал работу
Worker 1 закончил
Worker 3 начал работу
Worker 2 закончил
Worker 4 начал работу
Worker 3 закончил
Worker 4 закончил
2. Разница между Semaphore и BoundedSemaphore
import threading
# Semaphore — можно увеличивать счётчик бесконечно
sem = threading.Semaphore(1)
sem.release() # OK
sem.release() # OK
sem.release() # Счётчик = 3
# BoundedSemaphore — ограничена начальным значением
bounded_sem = threading.BoundedSemaphore(1)
bounded_sem.release() # OK
bounded_sem.release() # ValueError: Semaphore released too many times
3. Практический пример: рейт-лимитинг API запросов
import threading
import time
import requests
from typing import List
class RateLimitedClient:
def __init__(self, max_concurrent_requests: int = 3):
self.semaphore = threading.Semaphore(max_concurrent_requests)
self.request_count = 0
self.lock = threading.Lock()
def fetch_url(self, url: str) -> str:
with self.semaphore: # Ограничиваем количество одновременных запросов
with self.lock:
self.request_count += 1
count = self.request_count
print(f"[{count}] Начинаю запрос: {url}")
try:
response = requests.get(url, timeout=5)
return response.text[:100] # Первые 100 символов
except Exception as e:
return f"Ошибка: {e}"
finally:
print(f"[{count}] Завершил запрос: {url}")
# Использование
client = RateLimitedClient(max_concurrent_requests=2)
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
threads = []
for url in urls:
t = threading.Thread(target=client.fetch_url, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
4. Семафор в asyncio
Для асинхронного кода есть asyncio.Semaphore:
import asyncio
async def async_worker(semaphore, worker_id):
async with semaphore:
print(f"Worker {worker_id} начал работу")
await asyncio.sleep(2)
print(f"Worker {worker_id} закончил")
async def main():
# Ограничиваем 3 одновременными задачами
semaphore = asyncio.Semaphore(3)
tasks = [async_worker(semaphore, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
5. Практический пример: пул соединений к БД
import threading
import sqlite3
from contextlib import contextmanager
from typing import Iterator
class ConnectionPool:
def __init__(self, db_path: str, pool_size: int = 5):
self.db_path = db_path
self.semaphore = threading.Semaphore(pool_size)
self.local = threading.local()
@contextmanager
def get_connection(self) -> Iterator[sqlite3.Connection]:
"""Получить соединение из пула"""
self.semaphore.acquire()
try:
# Создаём соединение для текущего потока если не существует
if not hasattr(self.local, 'connection'):
self.local.connection = sqlite3.connect(self.db_path)
yield self.local.connection
finally:
self.semaphore.release()
# Использование
pool = ConnectionPool(':memory:', pool_size=3)
def query_worker(worker_id):
with pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT ?', (f"Worker {worker_id}",))
print(f"Worker {worker_id}: {cursor.fetchone()}")
threads = []
for i in range(10):
t = threading.Thread(target=query_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
6. Counting Semaphore для синхронизации событий
import threading
import time
class ProducerConsumer:
def __init__(self):
# empty семафор: сколько пустых мест в буфере
self.empty = threading.Semaphore(5) # Буфер на 5 элементов
# full семафор: сколько заполненных элементов
self.full = threading.Semaphore(0)
# Мьютекс для защиты буфера
self.mutex = threading.Lock()
self.buffer = []
def produce(self, item):
self.empty.acquire() # Ждём пустого места
with self.mutex:
self.buffer.append(item)
print(f"Произвёл: {item}, буфер: {self.buffer}")
self.full.release() # Сигнализируем о новом элементе
def consume(self):
self.full.acquire() # Ждём полного элемента
with self.mutex:
item = self.buffer.pop(0)
print(f"Потребил: {item}, буфер: {self.buffer}")
self.empty.release() # Сигнализируем об пустом месте
return item
# Использование
pc = ProducerConsumer()
def producer():
for i in range(5):
pc.produce(f"item-{i}")
time.sleep(0.5)
def consumer():
for i in range(5):
pc.consume()
time.sleep(0.8)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
7. Сравнение примитивов синхронизации
| Примитив | Назначение | Использование |
|---|---|---|
| Semaphore | Ограничить количество потоков | Рейт-лимитинг, пул ресурсов |
| Lock/Mutex | Исключительный доступ (1 поток) | Защита критических секций |
| RLock | Рекурсивная блокировка | Когда тот же поток вызывает сам себя |
| Event | Сигнализация события | Поток ждёт события от другого |
| Condition | Условная переменная | Сложная синхронизация потоков |
| Barrier | Синхронизация N потоков | Все должны достичь точки |
8. Вложенные семафоры
import threading
class ThreadPool:
def __init__(self, num_workers: int = 4):
# Семафор для ограничения рабочих потоков
self.worker_semaphore = threading.Semaphore(num_workers)
# Семафор для синхронизации очереди задач
self.task_semaphore = threading.Semaphore(0)
self.task_queue = []
self.lock = threading.Lock()
self.shutdown = False
def submit(self, task):
with self.lock:
self.task_queue.append(task)
self.task_semaphore.release()
def worker(self):
while not self.shutdown:
self.task_semaphore.acquire(timeout=1)
with self.lock:
if not self.task_queue:
continue
task = self.task_queue.pop(0)
try:
task()
finally:
self.worker_semaphore.release()
Анти-паттерны
# ❌ Плохо — может привести к deadlock
sem1 = threading.Semaphore(1)
sem2 = threading.Semaphore(1)
def thread1():
sem1.acquire()
sem2.acquire() # Может заблокироваться
def thread2():
sem2.acquire()
sem1.acquire() # Может заблокироваться
# ✅ Хорошо — всегда захватывать в одном порядке
def thread1():
sem1.acquire()
sem2.acquire()
def thread2():
sem1.acquire()
sem2.acquire()
Выводы
Семафоры в Python — это мощный инструмент для:
- Ограничения ресурсов (max connections, threads)
- Рейт-лимитинга (API запросы)
- Синхронизации потоков (producer-consumer)
- Пулов ресурсов (БД соединения, worker threads)
Выбирайте между threading.Semaphore (потоки) и asyncio.Semaphore (асинхронность) в зависимости от архитектуры вашего приложения.