Что такое очереди в Redis?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Очереди в Redis
Очереди (queues) в Redis — это структуры данных, которые позволяют хранить и обрабатывать задачи в порядке FIFO (First In, First Out). Это один из самых мощных инструментов Redis для масштабируемых систем.
Суть очереди
Очередь работает как обычная ввода-вывода:
- Один или несколько producers (производители) добавляют задачи в конец очереди
- Один или несколько consumers (потребители) берут задачи с начала очереди
- Задачи обрабатываются по мере их получения
Базовые команды Redis
import redis
# Подключаемся к Redis
red = redis.Redis(host='localhost', port=6379, decode_responses=True)
# RPUSH — добавить в конец очереди
red.rpush('task_queue', 'send_email')
red.rpush('task_queue', 'process_payment')
red.rpush('task_queue', 'generate_report')
# LPUSH — добавить в начало (для приоритета)
red.lpush('priority_queue', 'urgent_task')
# LLEN — получить длину очереди
queue_size = red.llen('task_queue') # 3
# LPOP — взять задачу с начала (удаляет)
task = red.lpop('task_queue') # 'send_email'
# LRANGE — посмотреть все элементы
all_tasks = red.lrange('task_queue', 0, -1) # ['process_payment', 'generate_report']
# LTRIM — оставить только N элементов
red.ltrim('task_queue', 0, 99) # Оставить первые 100 элементов
Producer-Consumer паттерн
Producer (отправитель задач):
import redis
import json
red = redis.Redis()
def send_task(task_name, data):
"""Отправляет задачу в очередь"""
task = {
'name': task_name,
'data': data
}
red.rpush('tasks', json.dumps(task))
print(f"Task sent: {task_name}")
# Отправляем несколько задач
send_task('send_email', {'email': 'user@example.com', 'subject': 'Hello'})
send_task('process_payment', {'user_id': 123, 'amount': 99.99})
send_task('generate_report', {'report_type': 'monthly'})
Consumer (обработчик задач):
import redis
import json
import time
red = redis.Redis()
def process_tasks():
"""Непрерывно обрабатывает задачи из очереди"""
while True:
# BLPOP — блокирующее извлечение (ждёт, если очередь пуста)
result = red.blpop('tasks', timeout=1) # Ждёт 1 сек
if result is None:
print("Нет задач, жду...")
continue
queue_name, task_json = result
task = json.loads(task_json)
try:
print(f"Processing: {task['name']}")
if task['name'] == 'send_email':
send_email(task['data'])
elif task['name'] == 'process_payment':
process_payment(task['data'])
elif task['name'] == 'generate_report':
generate_report(task['data'])
print(f"Completed: {task['name']}")
except Exception as e:
print(f"Error: {e}")
# Можно отправить задачу в очередь ошибок
red.rpush('failed_tasks', task_json)
if __name__ == '__main__':
process_tasks()
BLPOP vs LPOP
LPOP — неблокирующее, возвращает сразу:
task = red.lpop('queue') # Если очередь пуста, вернёт None
BLPOP — блокирующее, ждёт задачу:
# Ждёт до 5 секунд, пока не появится задача
result = red.blpop('queue', timeout=5)
if result:
queue_name, task = result
else:
print("Timeout: нет задач")
BLPOP намного эффективнее для consumer'ов, потому что не тратит CPU на постоянную проверку.
Обработка ошибок и retry
def process_with_retry(queue_name, max_retries=3):
while True:
result = red.blpop(queue_name, timeout=1)
if not result:
continue
_, task_json = result
task = json.loads(task_json)
retries = task.get('retries', 0)
try:
handle_task(task)
except Exception as e:
if retries < max_retries:
print(f"Retry {retries + 1}/{max_retries}")
task['retries'] = retries + 1
# Отправляем обратно в очередь
red.rpush(queue_name, json.dumps(task))
else:
print(f"Failed after {max_retries} retries")
# Отправляем в очередь мёртвых писем
red.rpush('dead_letter_queue', json.dumps(task))
Несколько очередей (приоритеты)
def process_with_priority():
"""Обрабатывает приоритетные очереди"""
while True:
# Проверяем очереди в порядке приоритета
# BLPOP поддерживает несколько ключей!
result = red.blpop(
['priority_high', 'priority_normal', 'priority_low'],
timeout=1
)
if result:
queue_name, task = result
print(f"Processing from {queue_name}: {task}")
С использованием Celery (рекомендуется)
Для серьёзных проектов используют Celery — это framework для распределённых очередей с Redis:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def send_email(email, subject):
"""Асинхронная задача отправки email"""
print(f"Sending email to {email}: {subject}")
# Логика отправки email
return f"Email sent to {email}"
@app.task
def process_payment(user_id, amount):
"""Асинхронная обработка платежа"""
print(f"Processing payment: {user_id} -> ${amount}")
return True
# В коде отправляем задачу
from myapp.tasks import send_email, process_payment
# Асинхронный вызов (сразу вернёт)
send_email.delay('user@example.com', 'Hello!')
process_payment.delay(123, 99.99)
# С отложенным запуском
send_email.apply_async(
args=('user@example.com', 'Hello'),
countdown=60 # Запусти через 60 секунд
)
Worker (обработчик Celery):
celery -A myapp worker --loglevel=info
Преимущества очередей в Redis
- Масштабируемость — можешь добавить столько consumer'ов, сколько нужно
- Надёжность — задачи хранятся в Redis (не теряются)
- Скорость — Redis работает в памяти, очень быстро
- Простота — легко имплементировать простые очереди
- Persistence — Redis может сохранять данные на диск
Практический пример: Email рассылка
from flask import Flask
from flask_redis import FlaskRedis
import json
import threading
app = Flask(__name__)
app.config['REDIS_URL'] = 'redis://localhost:6379/0'
redis_client = FlaskRedis(app)
# Producer: добавляем email в очередь
@app.route('/send-newsletter', methods=['POST'])
def send_newsletter():
emails = ['user1@example.com', 'user2@example.com', 'user3@example.com']
for email in emails:
task = {
'email': email,
'subject': 'Weekly Newsletter',
'body': 'Check our latest articles...'
}
redis_client.rpush('email_queue', json.dumps(task))
return {'status': 'Queued', 'count': len(emails)}
# Consumer: обрабатывает email
def email_worker():
while True:
result = redis_client.blpop('email_queue', timeout=1)
if result:
_, task_json = result
task = json.loads(task_json)
send_actual_email(task['email'], task['subject'], task['body'])
def send_actual_email(email, subject, body):
print(f"Sending to {email}")
# Логика отправки SMTP
# Запусти worker в отдельном потоке
worker_thread = threading.Thread(target=email_worker, daemon=True)
worker_thread.start()
Ограничения Redis очередей
- Нет гарантии delivery при сбое Redis (используй persistence)
- Нет встроенного scheduling (используй Celery или APScheduler)
- Нет приоритетов по умолчанию (нужно несколько очередей)
Вывод
Очереди в Redis — это мощный инструмент для асинхронной обработки задач. Используй их для:
- Email рассылок
- Обработки платежей
- Генерации отчётов
- Долгоживущих вычислений
- Обработки массивов данных
Для production систем рекомендую использовать Celery или RQ вместо чистого Redis.