← Назад к вопросам

Какие знаешь основные операции очереди?

1.0 Junior🔥 81 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Основные операции очереди

Очередь (Queue) — это фундаментальная структура данных, используемая везде, от системного программирования до веб-приложений.

1. FIFO очередь (First In First Out)

FIFO очередь — элементы извлекаются в том же порядке, в котором были добавлены.

from collections import deque

# Создание очереди
queue = deque()

# Основные операции
queue.append(1)      # Добавить элемент (в конец)
queue.append(2)
queue.append(3)
print(queue)         # deque([1, 2, 3])

element = queue.popleft()  # Извлечь элемент (из начала)
print(element)       # 1
print(queue)         # deque([2, 3])

# Проверка пусто ли
if queue:
    print("Queue not empty")

if not queue:  # или
    print("Queue is empty")

# Размер
size = len(queue)
print(f"Queue size: {size}")

2. Операции

Основные:

  • enqueue(item) — добавить в очередь
  • dequeue() — извлечь из очереди
  • is_empty() — проверить пусто
  • size() — размер
from collections import deque

class Queue:
    def __init__(self):
        self.items = deque()
    
    def enqueue(self, item):
        """Добавить элемент в конец очереди"""
        self.items.append(item)
    
    def dequeue(self):
        """Извлечь элемент из начала очереди"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items.popleft()
    
    def is_empty(self) -> bool:
        """Проверить пусто ли"""
        return len(self.items) == 0
    
    def size(self) -> int:
        """Размер очереди"""
        return len(self.items)
    
    def peek(self):
        """Посмотреть первый элемент БЕЗ удаления"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        return self.items[0]

# Использование
q = Queue()
q.enqueue("task1")
q.enqueue("task2")
q.enqueue("task3")

print(f"First task: {q.peek()}")  # task1
print(f"Processing: {q.dequeue()}")  # task1
print(f"Size: {q.size()}")  # 2

3. Priority Queue

Priority Queue — элементы извлекаются по приоритету, не по порядку добавления.

import heapq
from typing import Any, Tuple

class PriorityQueue:
    def __init__(self):
        self.items = []
        self.counter = 0  # Для стабильности при равных приоритетах
    
    def enqueue(self, item: Any, priority: int):
        """Добавить с приоритетом (меньше = выше)"""
        # heapq работает с min-heap
        heapq.heappush(self.items, (priority, self.counter, item))
        self.counter += 1
    
    def dequeue(self) -> Tuple[Any, int]:
        """Извлечь элемент с наивысшим приоритетом"""
        if self.is_empty():
            raise IndexError("Queue is empty")
        priority, _, item = heapq.heappop(self.items)
        return item, priority
    
    def is_empty(self) -> bool:
        return len(self.items) == 0

# Использование: система задач
pq = PriorityQueue()

pq.enqueue("Low priority task", priority=3)
pq.enqueue("Critical bug", priority=1)
pq.enqueue("Medium task", priority=2)

while not pq.is_empty():
    task, priority = pq.dequeue()
    print(f"Processing (priority {priority}): {task}")

# Output:
# Processing (priority 1): Critical bug
# Processing (priority 2): Medium task
# Processing (priority 3): Low priority task

4. Double-Ended Queue (Deque)

Deque — можно добавлять и извлекать с обоих концов.

from collections import deque

dq = deque()

# Добавление с обоих концов
dq.append("right1")      # Добавить справа
dq.appendleft("left1")   # Добавить слева
dq.append("right2")
dq.appendleft("left2")

print(dq)  # deque(['left2', 'left1', 'right1', 'right2'])

# Извлечение с обоих концов
right = dq.pop()         # Извлечь справа
left = dq.popleft()      # Извлечь слева

print(f"From right: {right}")  # right2
print(f"From left: {left}")    # left2
print(dq)                # deque(['left1', 'right1'])

# Полезно для:
# - Скользящего окна (sliding window)
# - Деков в алгоритмах
# - Реализации стеков (если нужны оба конца)

5. Асинхронная очередь (asyncio.Queue)

asyncio.Queue — для асинхронной обработки задач.

import asyncio

async def producer(queue: asyncio.Queue, n: int):
    """Производитель — добавляет элементы"""
    for i in range(n):
        await queue.put(f"Task {i}")
        print(f"Produced: Task {i}")
        await asyncio.sleep(0.5)
    
    # Сигнал завершения
    await queue.put(None)

async def consumer(queue: asyncio.Queue, worker_id: int):
    """Потребитель — обрабатывает элементы"""
    while True:
        item = await queue.get()  # Ждёт элемента
        
        if item is None:  # Сигнал завершения
            queue.task_done()
            break
        
        print(f"Worker {worker_id} processing: {item}")
        await asyncio.sleep(1)  # Обработка
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)  # Максимум 5 элементов
    
    # Один производитель
    producer_task = asyncio.create_task(producer(queue, 10))
    
    # Несколько потребителей
    consumer_tasks = [
        asyncio.create_task(consumer(queue, i))
        for i in range(3)
    ]
    
    # Ждём завершения
    await producer_task
    await queue.join()  # Ждём обработки всех элементов
    
    # Отменяем потребителей
    for task in consumer_tasks:
        task.cancel()

asyncio.run(main())

6. Thread-safe Queue

queue.Queue — потокобезопасная очередь для многопоточности.

import queue
import threading
import time

def producer_thread(q: queue.Queue):
    """Производитель в отдельном потоке"""
    for i in range(5):
        item = f"Item {i}"
        q.put(item)
        print(f"Produced: {item}")
        time.sleep(0.5)

def consumer_thread(q: queue.Queue, worker_id: int):
    """Потребитель в отдельном потоке"""
    while True:
        try:
            item = q.get(timeout=2)  # Ждём с таймаутом
            print(f"Worker {worker_id} got: {item}")
            time.sleep(1)  # Обработка
            q.task_done()
        except queue.Empty:
            print(f"Worker {worker_id} waiting...")

# Создаём очередь
q = queue.Queue(maxsize=3)

# Запускаем потоки
producer = threading.Thread(target=producer_thread, args=(q,))
consumers = [
    threading.Thread(target=consumer_thread, args=(q, i))
    for i in range(2)
]

producer.start()
for c in consumers:
    c.start()

producer.join()
q.join()  # Ждём обработки всех

7. Практические примеры

Пример 1: Система обработки задач (Task Queue)

from dataclasses import dataclass
from typing import Callable
import asyncio

@dataclass
class Task:
    id: str
    func: Callable
    args: tuple = ()
    priority: int = 1

class TaskProcessor:
    def __init__(self, max_workers: int = 3):
        self.queue = asyncio.PriorityQueue()
        self.workers = max_workers
    
    async def add_task(self, task: Task):
        await self.queue.put((task.priority, task))
    
    async def worker(self, worker_id: int):
        while True:
            try:
                priority, task = await asyncio.wait_for(
                    self.queue.get(), timeout=5
                )
                print(f"Worker {worker_id}: Processing {task.id}")
                result = task.func(*task.args)
                print(f"Worker {worker_id}: Completed {task.id}")
                self.queue.task_done()
            except asyncio.TimeoutError:
                break
    
    async def run(self):
        # Запустить workers
        workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.workers)
        ]
        
        # Ждём завершения
        await self.queue.join()
        
        # Отменяем workers
        for w in workers:
            w.cancel()

# Использование
async def main():
    processor = TaskProcessor(max_workers=2)
    
    # Добавляем задачи
    for i in range(5):
        task = Task(
            id=f"task_{i}",
            func=lambda x: x ** 2,
            args=(i,),
            priority=i % 2  # Чередуем приоритеты
        )
        await processor.add_task(task)
    
    await processor.run()

asyncio.run(main())

Пример 2: Обработчик сообщений (Message Queue Pattern)

from dataclasses import dataclass
from enum import Enum
import asyncio

class MessageType(Enum):
    USER_SIGNUP = 1
    USER_LOGIN = 2
    ORDER_CREATED = 3
    ORDER_PAID = 4

@dataclass
class Message:
    type: MessageType
    data: dict
    created_at: float

class MessageBus:
    def __init__(self):
        self.queue = asyncio.Queue()
        self.handlers = {}
    
    def register(self, msg_type: MessageType, handler: Callable):
        if msg_type not in self.handlers:
            self.handlers[msg_type] = []
        self.handlers[msg_type].append(handler)
    
    async def publish(self, message: Message):
        await self.queue.put(message)
    
    async def process(self):
        while True:
            message = await self.queue.get()
            
            handlers = self.handlers.get(message.type, [])
            for handler in handlers:
                await handler(message.data)
            
            self.queue.task_done()

# Использование
async def on_user_signup(data):
    print(f"New user: {data['email']}")
    # Отправить приветственное письмо

async def on_order_created(data):
    print(f"Order created: {data['order_id']}")
    # Уведомить склад

async def main():
    bus = MessageBus()
    bus.register(MessageType.USER_SIGNUP, on_user_signup)
    bus.register(MessageType.ORDER_CREATED, on_order_created)
    
    # Запустить обработчик
    processor = asyncio.create_task(bus.process())
    
    # Опубликовать сообщения
    await bus.publish(Message(
        type=MessageType.USER_SIGNUP,
        data={'email': 'user@example.com'}
    ))
    
    await asyncio.sleep(1)
    processor.cancel()

asyncio.run(main())

Сравнение очередей

┌──────────────────┬─────────┬────────────┬────────────┐
│ Тип              │ FIFO    │ Приоритет  │ Потокобез. │
├──────────────────┼─────────┼────────────┼────────────┤
│ deque            │ ✅      │ ✗          │ ✗          │
│ heapq            │ ✗       │ ✅         │ ✗          │
│ asyncio.Queue    │ ✅      │ ✗          │ ✅*        │
│ queue.Queue      │ ✅      │ ✗          │ ✅         │
│ queue.PriorityQ  │ ✗       │ ✅         │ ✅         │
└──────────────────┴─────────┴────────────┴────────────┘
* asyncio-специфичная

Итоги

  • FIFO Queue: deque или queue.Queue
  • Priority Queue: heapq или queue.PriorityQueue
  • Async Tasks: asyncio.Queue
  • Multi-threaded: queue.Queue, queue.PriorityQueue
  • Deque (оба конца): collections.deque

Очереди — основа асинхронных систем, обработки событий и распределённых архитектур.