Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Publish в контексте программирования
Publish (опубликовать) — это один из ключевых концептов в асинхронном программировании и системах обмена сообщениями. В контексте Python разработки, это обычно означает отправку сообщения в очередь или канал для других приложений, микросервисов или слушателей.
Основной концепт: Pub/Sub (Publish-Subscribe)
Паттерн Publish-Subscribe — это архитектурный паттерн, в котором:
- Publisher (издатель) отправляет сообщения в channel (канал) или topic (тему)
- Subscriber (подписчик) слушает этот канал и получает сообщения
- Publisher и Subscriber не знают друг о друге
Publisher → Message Broker (Redis/RabbitMQ) → Subscriber 1
→ Subscriber 2
→ Subscriber 3
Publish в Redis
Redis — популярная система для Pub/Sub:
import redis
# Подключаемся к Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Publish: отправляем сообщение в канал
result = r.publish('notifications', 'Hello, subscribers!')
print(f"Сообщение отправлено {result} подписчикам")
# Пример с данными
user_event = {
'event': 'user_registered',
'user_id': 123,
'email': 'user@example.com'
}
import json
r.publish('user_events', json.dumps(user_event))
Subscriber в Redis
import redis
import json
# Создаём подписчика
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()
# Подписываемся на канал
pubsub.subscribe('user_events')
print("Слушаю события...")
for message in pubsub.listen():
if message['type'] == 'message':
# Получили сообщение от publisher'а
data = json.loads(message['data'])
print(f"Получено событие: {data}")
if data['event'] == 'user_registered':
print(f"Новый пользователь: {data['email']}")
Асинхронный Pub/Sub с aioredis
import asyncio
import aioredis
import json
async def publisher():
"""Издатель отправляет сообщения"""
redis = await aioredis.create_redis_pool('redis://localhost')
for i in range(5):
message = {'id': i, 'text': f'Message {i}'}
await redis.publish('my_channel', json.dumps(message))
print(f"Опубликовано: {message}")
await asyncio.sleep(1)
redis.close()
await redis.wait_closed()
async def subscriber():
"""Подписчик получает сообщения"""
redis = await aioredis.create_redis_pool('redis://localhost')
ch = await redis.subscribe('my_channel')
async for message in ch[0].iter():
data = json.loads(message.decode())
print(f"Подписчик получил: {data}")
redis.close()
await redis.wait_closed()
# Запускаем издателя и подписчика параллельно
async def main():
await asyncio.gather(
publisher(),
subscriber()
)
# asyncio.run(main())
Publish в RabbitMQ
RabbitMQ — более мощный message broker с дополнительными функциями:
import pika
import json
# Подключаемся к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем exchange (точка обмена)
channel.exchange_declare(exchange='user_events', exchange_type='topic')
# Publish: отправляем сообщение
message = {
'user_id': 123,
'action': 'login',
'timestamp': '2024-03-23T10:30:00'
}
channel.basic_publish(
exchange='user_events',
routing_key='user.login', # Тема сообщения
body=json.dumps(message),
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2 # Persistent message
)
)
print("Сообщение опубликовано в RabbitMQ")
connection.close()
Subscriber в RabbitMQ
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем exchange
channel.exchange_declare(exchange='user_events', exchange_type='topic')
# Создаём очередь
queue_result = channel.queue_declare(queue='', exclusive=True)
queue_name = queue_result.method.queue
# Привязываем очередь к exchange с routing key
channel.queue_bind(
exchange='user_events',
queue=queue_name,
routing_key='user.*' # Слушаем все события user.*
)
def callback(ch, method, properties, body):
"""Обработчик полученного сообщения"""
data = json.loads(body)
print(f"Получено событие: {data}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
print('Слушаю события... (Press Ctrl+C to stop)')
channel.start_consuming()
Publish в Kafka
Kafka — распределённая система для потоков данных:
from kafka import KafkaProducer
import json
# Создаём producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Publish: отправляем сообщение в topic
event = {'user_id': 123, 'action': 'purchase', 'amount': 99.99}
future = producer.send('purchases', event)
# Ждём подтверждения
try:
record_metadata = future.get(timeout=10)
print(f"Сообщение отправлено в {record_metadata.topic}")
except Exception as e:
print(f"Ошибка: {e}")
producer.close()
Subscriber в Kafka
from kafka import KafkaConsumer
import json
# Создаём consumer
consumer = KafkaConsumer(
'purchases',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='analytics_group',
auto_offset_reset='earliest'
)
print("Слушаю события покупок...")
for message in consumer:
event = message.value
print(f"Получена покупка: пользователь {event['user_id']}, сумма {event['amount']}")
Publish в Django Signals (ORM уровень)
Django предоставляет собственную систему сигналов:
from django.db.models.signals import post_save
from django.dispatch import receiver
from myapp.models import User
# Отправляем сигнал (publish)
@receiver(post_save, sender=User)
def user_created(sender, instance, created, **kwargs):
"""Этот сигнал срабатывает после сохранения User"""
if created:
print(f"Новый пользователь создан: {instance.email}")
# Здесь можно отправить письмо, добавить в очередь и т.д.
# В других местах кода можно подписаться на этот сигнал
from django.core.signals import request_finished
@receiver(request_finished)
def log_request(sender, **kwargs):
print("Запрос завершён")
Практический пример: система уведомлений
import asyncio
import aioredis
import json
class NotificationService:
def __init__(self):
self.redis = None
async def init(self):
self.redis = await aioredis.create_redis_pool('redis://localhost')
async def publish_notification(self, user_id: int, message: str):
"""Publish уведомление пользователю"""
notification = {
'user_id': user_id,
'message': message,
'timestamp': asyncio.get_event_loop().time()
}
await self.redis.publish(
f'user:{user_id}:notifications',
json.dumps(notification)
)
async def subscribe_notifications(self, user_id: int):
"""Subscribe на уведомления пользователя"""
channel = await self.redis.subscribe(f'user:{user_id}:notifications')
async for message in channel[0].iter():
notification = json.loads(message.decode())
yield notification
# Использование
async def main():
service = NotificationService()
await service.init()
# Отправляем уведомление
await service.publish_notification(1, "Hello, user!")
# Слушаем уведомления
async for notification in service.subscribe_notifications(1):
print(f"Уведомление: {notification['message']}")
Типичные use cases для publish
- Event-driven архитектура — микросервисы обмениваются событиями
- Нотификации — отправка уведомлений пользователям
- Логирование — централизованное логирование
- Analytics — отправка событий аналитике
- Cache invalidation — инвалидация кэша в распределённой системе
Best practices
- Используйте JSON для структурированных сообщений
- Добавляйте metadata (timestamp, source, version)
- Обрабатывайте ошибки в subscriber'ах
- Используйте очереди для гарантии доставки
- Логируйте события для отладки
Publish — это мощный паттерн для создания слабо связанных, масштабируемых систем.