← Назад к вопросам

Что такое publish?

2.0 Middle🔥 161 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Publish в контексте программирования

Publish (опубликовать) — это один из ключевых концептов в асинхронном программировании и системах обмена сообщениями. В контексте Python разработки, это обычно означает отправку сообщения в очередь или канал для других приложений, микросервисов или слушателей.

Основной концепт: Pub/Sub (Publish-Subscribe)

Паттерн Publish-Subscribe — это архитектурный паттерн, в котором:

  • Publisher (издатель) отправляет сообщения в channel (канал) или topic (тему)
  • Subscriber (подписчик) слушает этот канал и получает сообщения
  • Publisher и Subscriber не знают друг о друге
Publisher → Message Broker (Redis/RabbitMQ) → Subscriber 1
                                            → Subscriber 2
                                            → Subscriber 3

Publish в Redis

Redis — популярная система для Pub/Sub:

import redis

# Подключаемся к Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Publish: отправляем сообщение в канал
result = r.publish('notifications', 'Hello, subscribers!')
print(f"Сообщение отправлено {result} подписчикам")

# Пример с данными
user_event = {
    'event': 'user_registered',
    'user_id': 123,
    'email': 'user@example.com'
}

import json
r.publish('user_events', json.dumps(user_event))

Subscriber в Redis

import redis
import json

# Создаём подписчика
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()

# Подписываемся на канал
pubsub.subscribe('user_events')

print("Слушаю события...")
for message in pubsub.listen():
    if message['type'] == 'message':
        # Получили сообщение от publisher'а
        data = json.loads(message['data'])
        print(f"Получено событие: {data}")
        
        if data['event'] == 'user_registered':
            print(f"Новый пользователь: {data['email']}")

Асинхронный Pub/Sub с aioredis

import asyncio
import aioredis
import json

async def publisher():
    """Издатель отправляет сообщения"""
    redis = await aioredis.create_redis_pool('redis://localhost')
    
    for i in range(5):
        message = {'id': i, 'text': f'Message {i}'}
        await redis.publish('my_channel', json.dumps(message))
        print(f"Опубликовано: {message}")
        await asyncio.sleep(1)
    
    redis.close()
    await redis.wait_closed()

async def subscriber():
    """Подписчик получает сообщения"""
    redis = await aioredis.create_redis_pool('redis://localhost')
    ch = await redis.subscribe('my_channel')
    
    async for message in ch[0].iter():
        data = json.loads(message.decode())
        print(f"Подписчик получил: {data}")
    
    redis.close()
    await redis.wait_closed()

# Запускаем издателя и подписчика параллельно
async def main():
    await asyncio.gather(
        publisher(),
        subscriber()
    )

# asyncio.run(main())

Publish в RabbitMQ

RabbitMQ — более мощный message broker с дополнительными функциями:

import pika
import json

# Подключаемся к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем exchange (точка обмена)
channel.exchange_declare(exchange='user_events', exchange_type='topic')

# Publish: отправляем сообщение
message = {
    'user_id': 123,
    'action': 'login',
    'timestamp': '2024-03-23T10:30:00'
}

channel.basic_publish(
    exchange='user_events',
    routing_key='user.login',  # Тема сообщения
    body=json.dumps(message),
    properties=pika.BasicProperties(
        content_type='application/json',
        delivery_mode=2  # Persistent message
    )
)

print("Сообщение опубликовано в RabbitMQ")
connection.close()

Subscriber в RabbitMQ

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Объявляем exchange
channel.exchange_declare(exchange='user_events', exchange_type='topic')

# Создаём очередь
queue_result = channel.queue_declare(queue='', exclusive=True)
queue_name = queue_result.method.queue

# Привязываем очередь к exchange с routing key
channel.queue_bind(
    exchange='user_events',
    queue=queue_name,
    routing_key='user.*'  # Слушаем все события user.*
)

def callback(ch, method, properties, body):
    """Обработчик полученного сообщения"""
    data = json.loads(body)
    print(f"Получено событие: {data}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

print('Слушаю события... (Press Ctrl+C to stop)')
channel.start_consuming()

Publish в Kafka

Kafka — распределённая система для потоков данных:

from kafka import KafkaProducer
import json

# Создаём producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Publish: отправляем сообщение в topic
event = {'user_id': 123, 'action': 'purchase', 'amount': 99.99}
future = producer.send('purchases', event)

# Ждём подтверждения
try:
    record_metadata = future.get(timeout=10)
    print(f"Сообщение отправлено в {record_metadata.topic}")
except Exception as e:
    print(f"Ошибка: {e}")

producer.close()

Subscriber в Kafka

from kafka import KafkaConsumer
import json

# Создаём consumer
consumer = KafkaConsumer(
    'purchases',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='analytics_group',
    auto_offset_reset='earliest'
)

print("Слушаю события покупок...")
for message in consumer:
    event = message.value
    print(f"Получена покупка: пользователь {event['user_id']}, сумма {event['amount']}")

Publish в Django Signals (ORM уровень)

Django предоставляет собственную систему сигналов:

from django.db.models.signals import post_save
from django.dispatch import receiver
from myapp.models import User

# Отправляем сигнал (publish)
@receiver(post_save, sender=User)
def user_created(sender, instance, created, **kwargs):
    """Этот сигнал срабатывает после сохранения User"""
    if created:
        print(f"Новый пользователь создан: {instance.email}")
        # Здесь можно отправить письмо, добавить в очередь и т.д.

# В других местах кода можно подписаться на этот сигнал
from django.core.signals import request_finished

@receiver(request_finished)
def log_request(sender, **kwargs):
    print("Запрос завершён")

Практический пример: система уведомлений

import asyncio
import aioredis
import json

class NotificationService:
    def __init__(self):
        self.redis = None
    
    async def init(self):
        self.redis = await aioredis.create_redis_pool('redis://localhost')
    
    async def publish_notification(self, user_id: int, message: str):
        """Publish уведомление пользователю"""
        notification = {
            'user_id': user_id,
            'message': message,
            'timestamp': asyncio.get_event_loop().time()
        }
        await self.redis.publish(
            f'user:{user_id}:notifications',
            json.dumps(notification)
        )
    
    async def subscribe_notifications(self, user_id: int):
        """Subscribe на уведомления пользователя"""
        channel = await self.redis.subscribe(f'user:{user_id}:notifications')
        
        async for message in channel[0].iter():
            notification = json.loads(message.decode())
            yield notification

# Использование
async def main():
    service = NotificationService()
    await service.init()
    
    # Отправляем уведомление
    await service.publish_notification(1, "Hello, user!")
    
    # Слушаем уведомления
    async for notification in service.subscribe_notifications(1):
        print(f"Уведомление: {notification['message']}")

Типичные use cases для publish

  1. Event-driven архитектура — микросервисы обмениваются событиями
  2. Нотификации — отправка уведомлений пользователям
  3. Логирование — централизованное логирование
  4. Analytics — отправка событий аналитике
  5. Cache invalidation — инвалидация кэша в распределённой системе

Best practices

  1. Используйте JSON для структурированных сообщений
  2. Добавляйте metadata (timestamp, source, version)
  3. Обрабатывайте ошибки в subscriber'ах
  4. Используйте очереди для гарантии доставки
  5. Логируйте события для отладки

Publish — это мощный паттерн для создания слабо связанных, масштабируемых систем.

Что такое publish? | PrepBro