← Назад к вопросам

Что такое очереди в Redis?

2.2 Middle🔥 221 комментариев
#Базы данных (NoSQL)#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Очереди в Redis

Очереди (queues) в Redis — это структуры данных, которые позволяют хранить и обрабатывать задачи в порядке FIFO (First In, First Out). Это один из самых мощных инструментов Redis для масштабируемых систем.

Суть очереди

Очередь работает как обычная ввода-вывода:

  • Один или несколько producers (производители) добавляют задачи в конец очереди
  • Один или несколько consumers (потребители) берут задачи с начала очереди
  • Задачи обрабатываются по мере их получения

Базовые команды Redis

import redis

# Подключаемся к Redis
red = redis.Redis(host='localhost', port=6379, decode_responses=True)

# RPUSH — добавить в конец очереди
red.rpush('task_queue', 'send_email')
red.rpush('task_queue', 'process_payment')
red.rpush('task_queue', 'generate_report')

# LPUSH — добавить в начало (для приоритета)
red.lpush('priority_queue', 'urgent_task')

# LLEN — получить длину очереди
queue_size = red.llen('task_queue')  # 3

# LPOP — взять задачу с начала (удаляет)
task = red.lpop('task_queue')  # 'send_email'

# LRANGE — посмотреть все элементы
all_tasks = red.lrange('task_queue', 0, -1)  # ['process_payment', 'generate_report']

# LTRIM — оставить только N элементов
red.ltrim('task_queue', 0, 99)  # Оставить первые 100 элементов

Producer-Consumer паттерн

Producer (отправитель задач):

import redis
import json

red = redis.Redis()

def send_task(task_name, data):
    """Отправляет задачу в очередь"""
    task = {
        'name': task_name,
        'data': data
    }
    red.rpush('tasks', json.dumps(task))
    print(f"Task sent: {task_name}")

# Отправляем несколько задач
send_task('send_email', {'email': 'user@example.com', 'subject': 'Hello'})
send_task('process_payment', {'user_id': 123, 'amount': 99.99})
send_task('generate_report', {'report_type': 'monthly'})

Consumer (обработчик задач):

import redis
import json
import time

red = redis.Redis()

def process_tasks():
    """Непрерывно обрабатывает задачи из очереди"""
    while True:
        # BLPOP — блокирующее извлечение (ждёт, если очередь пуста)
        result = red.blpop('tasks', timeout=1)  # Ждёт 1 сек
        
        if result is None:
            print("Нет задач, жду...")
            continue
        
        queue_name, task_json = result
        task = json.loads(task_json)
        
        try:
            print(f"Processing: {task['name']}")
            
            if task['name'] == 'send_email':
                send_email(task['data'])
            elif task['name'] == 'process_payment':
                process_payment(task['data'])
            elif task['name'] == 'generate_report':
                generate_report(task['data'])
            
            print(f"Completed: {task['name']}")
        
        except Exception as e:
            print(f"Error: {e}")
            # Можно отправить задачу в очередь ошибок
            red.rpush('failed_tasks', task_json)

if __name__ == '__main__':
    process_tasks()

BLPOP vs LPOP

LPOP — неблокирующее, возвращает сразу:

task = red.lpop('queue')  # Если очередь пуста, вернёт None

BLPOP — блокирующее, ждёт задачу:

# Ждёт до 5 секунд, пока не появится задача
result = red.blpop('queue', timeout=5)
if result:
    queue_name, task = result
else:
    print("Timeout: нет задач")

BLPOP намного эффективнее для consumer'ов, потому что не тратит CPU на постоянную проверку.

Обработка ошибок и retry

def process_with_retry(queue_name, max_retries=3):
    while True:
        result = red.blpop(queue_name, timeout=1)
        if not result:
            continue
        
        _, task_json = result
        task = json.loads(task_json)
        
        retries = task.get('retries', 0)
        
        try:
            handle_task(task)
        except Exception as e:
            if retries < max_retries:
                print(f"Retry {retries + 1}/{max_retries}")
                task['retries'] = retries + 1
                # Отправляем обратно в очередь
                red.rpush(queue_name, json.dumps(task))
            else:
                print(f"Failed after {max_retries} retries")
                # Отправляем в очередь мёртвых писем
                red.rpush('dead_letter_queue', json.dumps(task))

Несколько очередей (приоритеты)

def process_with_priority():
    """Обрабатывает приоритетные очереди"""
    while True:
        # Проверяем очереди в порядке приоритета
        # BLPOP поддерживает несколько ключей!
        result = red.blpop(
            ['priority_high', 'priority_normal', 'priority_low'],
            timeout=1
        )
        
        if result:
            queue_name, task = result
            print(f"Processing from {queue_name}: {task}")

С использованием Celery (рекомендуется)

Для серьёзных проектов используют Celery — это framework для распределённых очередей с Redis:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def send_email(email, subject):
    """Асинхронная задача отправки email"""
    print(f"Sending email to {email}: {subject}")
    # Логика отправки email
    return f"Email sent to {email}"

@app.task
def process_payment(user_id, amount):
    """Асинхронная обработка платежа"""
    print(f"Processing payment: {user_id} -> ${amount}")
    return True

# В коде отправляем задачу
from myapp.tasks import send_email, process_payment

# Асинхронный вызов (сразу вернёт)
send_email.delay('user@example.com', 'Hello!')
process_payment.delay(123, 99.99)

# С отложенным запуском
send_email.apply_async(
    args=('user@example.com', 'Hello'),
    countdown=60  # Запусти через 60 секунд
)

Worker (обработчик Celery):

celery -A myapp worker --loglevel=info

Преимущества очередей в Redis

  1. Масштабируемость — можешь добавить столько consumer'ов, сколько нужно
  2. Надёжность — задачи хранятся в Redis (не теряются)
  3. Скорость — Redis работает в памяти, очень быстро
  4. Простота — легко имплементировать простые очереди
  5. Persistence — Redis может сохранять данные на диск

Практический пример: Email рассылка

from flask import Flask
from flask_redis import FlaskRedis
import json
import threading

app = Flask(__name__)
app.config['REDIS_URL'] = 'redis://localhost:6379/0'
redis_client = FlaskRedis(app)

# Producer: добавляем email в очередь
@app.route('/send-newsletter', methods=['POST'])
def send_newsletter():
    emails = ['user1@example.com', 'user2@example.com', 'user3@example.com']
    
    for email in emails:
        task = {
            'email': email,
            'subject': 'Weekly Newsletter',
            'body': 'Check our latest articles...'
        }
        redis_client.rpush('email_queue', json.dumps(task))
    
    return {'status': 'Queued', 'count': len(emails)}

# Consumer: обрабатывает email
def email_worker():
    while True:
        result = redis_client.blpop('email_queue', timeout=1)
        if result:
            _, task_json = result
            task = json.loads(task_json)
            send_actual_email(task['email'], task['subject'], task['body'])

def send_actual_email(email, subject, body):
    print(f"Sending to {email}")
    # Логика отправки SMTP

# Запусти worker в отдельном потоке
worker_thread = threading.Thread(target=email_worker, daemon=True)
worker_thread.start()

Ограничения Redis очередей

  • Нет гарантии delivery при сбое Redis (используй persistence)
  • Нет встроенного scheduling (используй Celery или APScheduler)
  • Нет приоритетов по умолчанию (нужно несколько очередей)

Вывод

Очереди в Redis — это мощный инструмент для асинхронной обработки задач. Используй их для:

  • Email рассылок
  • Обработки платежей
  • Генерации отчётов
  • Долгоживущих вычислений
  • Обработки массивов данных

Для production систем рекомендую использовать Celery или RQ вместо чистого Redis.

Что такое очереди в Redis? | PrepBro