← Назад к вопросам

Как подключаться к топикам Apache Kafka?

2.2 Middle🔥 161 комментариев
#Apache Kafka и потоковая обработка

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Подключение к топикам Apache Kafka

Подключение к Kafka топикам — это базовый навык Data Engineer. Есть несколько способов в зависимости от языка, потребностей и архитектуры.

1. Использование Python (kafka-python)

kafka-python — наиболее популярная библиотека для работы с Kafka в Python:

from kafka import KafkaConsumer, KafkaProducer
import json

# Подключение как консьюмер
consumer = KafkaConsumer(
    'orders',  # Топик
    'payments',  # Может слушать несколько топиков
    bootstrap_servers=['kafka1:9092', 'kafka2:9092'],  # Cluster nodes
    group_id='analytics_group',  # Группа для offset management
    auto_offset_reset='earliest',  # С начала, если нет saved offset
    enable_auto_commit=True,  # Auto-commit offsets
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    key_deserializer=lambda x: x.decode('utf-8') if x else None
)

# Обработка сообщений
for message in consumer:
    print(f"Topic: {message.topic}")
    print(f"Partition: {message.partition}")
    print(f"Offset: {message.offset}")
    print(f"Key: {message.key}")
    print(f"Value: {message.value}")
    print(f"Timestamp: {message.timestamp}")

2. Подключение как продюсер

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8'),
    acks='all',  # Ждём подтверждения от всех реплик
    retries=3,
    compression_type='snappy'
)

# Отправка сообщения
order = {
    'order_id': '12345',
    'user_id': 'user_001',
    'amount': 99.99,
    'timestamp': '2024-01-15T10:30:00Z'
}

future = producer.send('orders', value=order, key='order_id')

# Ждём отправки
try:
    record_metadata = future.get(timeout=10)
    print(f"Message sent to {record_metadata.topic} "
          f"partition {record_metadata.partition} "
          f"offset {record_metadata.offset}")
except Exception as e:
    print(f"Error: {e}")

producer.flush()  # Отправить оставшиеся сообщения
producer.close()

3. Подключение через Docker

Частый сценарий — Kafka запущена в Docker:

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  
  kafka:
    image: confluentinc/cp-kafka:7.0.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# Подключение из контейнера (service name как хост)
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka:29092']  # Используем service name
)

4. Подключение с аутентификацией (SASL/SSL)

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import ssl

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka.example.com:9092'],
    
    # SASL аутентификация
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    sasl_plain_username='username',
    sasl_plain_password='password',
    
    # SSL сертификаты
    ssl_cafile='/path/to/ca-cert',
    ssl_certfile='/path/to/client-cert',
    ssl_keyfile='/path/to/client-key',
    ssl_check_hostname=True,
    
    group_id='secure_group'
)

5. Обработка ошибок при подключении

from kafka.errors import KafkaError
import time

def robust_consumer(topic, bootstrap_servers, max_retries=5):
    """Консьюмер с обработкой ошибок и переподключением"""
    
    retries = 0
    while retries < max_retries:
        try:
            consumer = KafkaConsumer(
                topic,
                bootstrap_servers=bootstrap_servers,
                group_id='robust_group',
                auto_offset_reset='earliest',
                session_timeout_ms=30000,
                heartbeat_interval_ms=10000
            )
            
            for message in consumer:
                try:
                    process_message(message)
                except Exception as e:
                    print(f"Error processing message: {e}")
                    # Можем логировать в dead letter queue
                    send_to_dlq(message)
        
        except KafkaError as e:
            print(f"Kafka error: {e}")
            retries += 1
            wait_time = 2 ** retries  # Exponential backoff
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
        
        except Exception as e:
            print(f"Unexpected error: {e}")
            raise
    
    raise Exception(f"Failed to connect after {max_retries} retries")

6. Работа с Command Line (kafkacat)

Когда нужно быстро проверить данные в топике:

# Установка
brew install kafkacat  # macOS
sudo apt install kafkacat  # Ubuntu

# Чтение из топика
kafkacat -b kafka:9092 -t orders -C

# Чтение с начала
kafkacat -b kafka:9092 -t orders -C -o 0

# Чтение последних 10 сообщений
kafkacat -b kafka:9092 -t orders -C -o -10

# Форматирование JSON
kafkacat -b kafka:9092 -t orders -C -f 'Topic: %t, Partition: %p, Offset: %o, Value: %s\n'

# Отправка сообщения
echo '{"order_id": "12345"}' | kafkacat -b kafka:9092 -t orders -P

7. Использование Confluent Kafka Client

from confluent_kafka import Consumer, Producer
import json

# Более продвинутый и быстрый клиент

# Консьюмер
conf = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092',
    'group.id': 'analytics',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(conf)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        
        if msg is None:
            continue
        
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue
        
        print(f"Received: {msg.value().decode('utf-8')}")

finally:
    consumer.close()

# Продюсер
conf = {
    'bootstrap.servers': 'kafka1:9092,kafka2:9092',
    'acks': 'all'
}

producer = Producer(conf)

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()}')

order = {'order_id': '12345', 'amount': 99.99}
producer.produce('orders', 
                 value=json.dumps(order).encode('utf-8'),
                 callback=delivery_report)

producer.flush()
producer.close()

8. Практический пример: ETL пайплайн

from kafka import KafkaConsumer, KafkaProducer
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaETL:
    """ETL обработчик на Kafka"""
    
    def __init__(self, source_topic, dest_topic, bootstrap_servers):
        self.source_topic = source_topic
        self.dest_topic = dest_topic
        
        # Консьюмер из source
        self.consumer = KafkaConsumer(
            source_topic,
            bootstrap_servers=bootstrap_servers,
            group_id='etl_group',
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        
        # Продюсер в destination
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )
    
    def transform(self, message):
        """Бизнес-логика трансформации"""
        data = message.value
        
        # Пример трансформации
        return {
            'original_id': data.get('id'),
            'amount_usd': data.get('amount') * 0.85,  # Конвертация
            'processed_at': str(datetime.now()),
            'source': message.topic
        }
    
    def run(self):
        """Запуск ETL"""
        for message in self.consumer:
            try:
                transformed = self.transform(message)
                self.producer.send(self.dest_topic, value=transformed)
                logger.info(f"Processed message from offset {message.offset}")
            except Exception as e:
                logger.error(f"Error: {e}")
                # Dead letter queue
                self.producer.send('dlq_topic', value=message.value)
    
    def close(self):
        self.consumer.close()
        self.producer.close()

# Использование
if __name__ == '__main__':
    etl = KafkaETL(
        source_topic='raw_orders',
        dest_topic='processed_orders',
        bootstrap_servers=['kafka:9092']
    )
    
    try:
        etl.run()
    finally:
        etl.close()

Пошаговый чеклист подключения

  1. Проверить доступность Kafka кластера

    telnet kafka:9092
    
  2. Убедиться, что топик существует

    kafka-topics --bootstrap-server kafka:9092 --list
    
  3. Установить нужную библиотеку

    pip install kafka-python
    # или
    pip install confluent-kafka
    
  4. Написать простой тест подключения

    from kafka import KafkaConsumer
    consumer = KafkaConsumer(
        bootstrap_servers=['kafka:9092'],
        auto_offset_reset='earliest'
    )
    consumer.subscribe(['test_topic'])
    for msg in consumer:
        print(msg.value)
        break  # Выход после первого сообщения
    
  5. Добавить обработку ошибок и мониторинг

  6. Настроить логирование и алерты