Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Основные операции очереди
Очередь (Queue) — это фундаментальная структура данных, используемая везде, от системного программирования до веб-приложений.
1. FIFO очередь (First In First Out)
FIFO очередь — элементы извлекаются в том же порядке, в котором были добавлены.
from collections import deque
# Создание очереди
queue = deque()
# Основные операции
queue.append(1) # Добавить элемент (в конец)
queue.append(2)
queue.append(3)
print(queue) # deque([1, 2, 3])
element = queue.popleft() # Извлечь элемент (из начала)
print(element) # 1
print(queue) # deque([2, 3])
# Проверка пусто ли
if queue:
print("Queue not empty")
if not queue: # или
print("Queue is empty")
# Размер
size = len(queue)
print(f"Queue size: {size}")
2. Операции
Основные:
enqueue(item)— добавить в очередьdequeue()— извлечь из очередиis_empty()— проверить пустоsize()— размер
from collections import deque
class Queue:
def __init__(self):
self.items = deque()
def enqueue(self, item):
"""Добавить элемент в конец очереди"""
self.items.append(item)
def dequeue(self):
"""Извлечь элемент из начала очереди"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items.popleft()
def is_empty(self) -> bool:
"""Проверить пусто ли"""
return len(self.items) == 0
def size(self) -> int:
"""Размер очереди"""
return len(self.items)
def peek(self):
"""Посмотреть первый элемент БЕЗ удаления"""
if self.is_empty():
raise IndexError("Queue is empty")
return self.items[0]
# Использование
q = Queue()
q.enqueue("task1")
q.enqueue("task2")
q.enqueue("task3")
print(f"First task: {q.peek()}") # task1
print(f"Processing: {q.dequeue()}") # task1
print(f"Size: {q.size()}") # 2
3. Priority Queue
Priority Queue — элементы извлекаются по приоритету, не по порядку добавления.
import heapq
from typing import Any, Tuple
class PriorityQueue:
def __init__(self):
self.items = []
self.counter = 0 # Для стабильности при равных приоритетах
def enqueue(self, item: Any, priority: int):
"""Добавить с приоритетом (меньше = выше)"""
# heapq работает с min-heap
heapq.heappush(self.items, (priority, self.counter, item))
self.counter += 1
def dequeue(self) -> Tuple[Any, int]:
"""Извлечь элемент с наивысшим приоритетом"""
if self.is_empty():
raise IndexError("Queue is empty")
priority, _, item = heapq.heappop(self.items)
return item, priority
def is_empty(self) -> bool:
return len(self.items) == 0
# Использование: система задач
pq = PriorityQueue()
pq.enqueue("Low priority task", priority=3)
pq.enqueue("Critical bug", priority=1)
pq.enqueue("Medium task", priority=2)
while not pq.is_empty():
task, priority = pq.dequeue()
print(f"Processing (priority {priority}): {task}")
# Output:
# Processing (priority 1): Critical bug
# Processing (priority 2): Medium task
# Processing (priority 3): Low priority task
4. Double-Ended Queue (Deque)
Deque — можно добавлять и извлекать с обоих концов.
from collections import deque
dq = deque()
# Добавление с обоих концов
dq.append("right1") # Добавить справа
dq.appendleft("left1") # Добавить слева
dq.append("right2")
dq.appendleft("left2")
print(dq) # deque(['left2', 'left1', 'right1', 'right2'])
# Извлечение с обоих концов
right = dq.pop() # Извлечь справа
left = dq.popleft() # Извлечь слева
print(f"From right: {right}") # right2
print(f"From left: {left}") # left2
print(dq) # deque(['left1', 'right1'])
# Полезно для:
# - Скользящего окна (sliding window)
# - Деков в алгоритмах
# - Реализации стеков (если нужны оба конца)
5. Асинхронная очередь (asyncio.Queue)
asyncio.Queue — для асинхронной обработки задач.
import asyncio
async def producer(queue: asyncio.Queue, n: int):
"""Производитель — добавляет элементы"""
for i in range(n):
await queue.put(f"Task {i}")
print(f"Produced: Task {i}")
await asyncio.sleep(0.5)
# Сигнал завершения
await queue.put(None)
async def consumer(queue: asyncio.Queue, worker_id: int):
"""Потребитель — обрабатывает элементы"""
while True:
item = await queue.get() # Ждёт элемента
if item is None: # Сигнал завершения
queue.task_done()
break
print(f"Worker {worker_id} processing: {item}")
await asyncio.sleep(1) # Обработка
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5) # Максимум 5 элементов
# Один производитель
producer_task = asyncio.create_task(producer(queue, 10))
# Несколько потребителей
consumer_tasks = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Ждём завершения
await producer_task
await queue.join() # Ждём обработки всех элементов
# Отменяем потребителей
for task in consumer_tasks:
task.cancel()
asyncio.run(main())
6. Thread-safe Queue
queue.Queue — потокобезопасная очередь для многопоточности.
import queue
import threading
import time
def producer_thread(q: queue.Queue):
"""Производитель в отдельном потоке"""
for i in range(5):
item = f"Item {i}"
q.put(item)
print(f"Produced: {item}")
time.sleep(0.5)
def consumer_thread(q: queue.Queue, worker_id: int):
"""Потребитель в отдельном потоке"""
while True:
try:
item = q.get(timeout=2) # Ждём с таймаутом
print(f"Worker {worker_id} got: {item}")
time.sleep(1) # Обработка
q.task_done()
except queue.Empty:
print(f"Worker {worker_id} waiting...")
# Создаём очередь
q = queue.Queue(maxsize=3)
# Запускаем потоки
producer = threading.Thread(target=producer_thread, args=(q,))
consumers = [
threading.Thread(target=consumer_thread, args=(q, i))
for i in range(2)
]
producer.start()
for c in consumers:
c.start()
producer.join()
q.join() # Ждём обработки всех
7. Практические примеры
Пример 1: Система обработки задач (Task Queue)
from dataclasses import dataclass
from typing import Callable
import asyncio
@dataclass
class Task:
id: str
func: Callable
args: tuple = ()
priority: int = 1
class TaskProcessor:
def __init__(self, max_workers: int = 3):
self.queue = asyncio.PriorityQueue()
self.workers = max_workers
async def add_task(self, task: Task):
await self.queue.put((task.priority, task))
async def worker(self, worker_id: int):
while True:
try:
priority, task = await asyncio.wait_for(
self.queue.get(), timeout=5
)
print(f"Worker {worker_id}: Processing {task.id}")
result = task.func(*task.args)
print(f"Worker {worker_id}: Completed {task.id}")
self.queue.task_done()
except asyncio.TimeoutError:
break
async def run(self):
# Запустить workers
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.workers)
]
# Ждём завершения
await self.queue.join()
# Отменяем workers
for w in workers:
w.cancel()
# Использование
async def main():
processor = TaskProcessor(max_workers=2)
# Добавляем задачи
for i in range(5):
task = Task(
id=f"task_{i}",
func=lambda x: x ** 2,
args=(i,),
priority=i % 2 # Чередуем приоритеты
)
await processor.add_task(task)
await processor.run()
asyncio.run(main())
Пример 2: Обработчик сообщений (Message Queue Pattern)
from dataclasses import dataclass
from enum import Enum
import asyncio
class MessageType(Enum):
USER_SIGNUP = 1
USER_LOGIN = 2
ORDER_CREATED = 3
ORDER_PAID = 4
@dataclass
class Message:
type: MessageType
data: dict
created_at: float
class MessageBus:
def __init__(self):
self.queue = asyncio.Queue()
self.handlers = {}
def register(self, msg_type: MessageType, handler: Callable):
if msg_type not in self.handlers:
self.handlers[msg_type] = []
self.handlers[msg_type].append(handler)
async def publish(self, message: Message):
await self.queue.put(message)
async def process(self):
while True:
message = await self.queue.get()
handlers = self.handlers.get(message.type, [])
for handler in handlers:
await handler(message.data)
self.queue.task_done()
# Использование
async def on_user_signup(data):
print(f"New user: {data['email']}")
# Отправить приветственное письмо
async def on_order_created(data):
print(f"Order created: {data['order_id']}")
# Уведомить склад
async def main():
bus = MessageBus()
bus.register(MessageType.USER_SIGNUP, on_user_signup)
bus.register(MessageType.ORDER_CREATED, on_order_created)
# Запустить обработчик
processor = asyncio.create_task(bus.process())
# Опубликовать сообщения
await bus.publish(Message(
type=MessageType.USER_SIGNUP,
data={'email': 'user@example.com'}
))
await asyncio.sleep(1)
processor.cancel()
asyncio.run(main())
Сравнение очередей
┌──────────────────┬─────────┬────────────┬────────────┐
│ Тип │ FIFO │ Приоритет │ Потокобез. │
├──────────────────┼─────────┼────────────┼────────────┤
│ deque │ ✅ │ ✗ │ ✗ │
│ heapq │ ✗ │ ✅ │ ✗ │
│ asyncio.Queue │ ✅ │ ✗ │ ✅* │
│ queue.Queue │ ✅ │ ✗ │ ✅ │
│ queue.PriorityQ │ ✗ │ ✅ │ ✅ │
└──────────────────┴─────────┴────────────┴────────────┘
* asyncio-специфичная
Итоги
- FIFO Queue:
dequeилиqueue.Queue - Priority Queue:
heapqилиqueue.PriorityQueue - Async Tasks:
asyncio.Queue - Multi-threaded:
queue.Queue,queue.PriorityQueue - Deque (оба конца):
collections.deque
Очереди — основа асинхронных систем, обработки событий и распределённых архитектур.