← Назад к вопросам
Как подключаться к топикам Apache Kafka?
2.2 Middle🔥 161 комментариев
#Apache Kafka и потоковая обработка
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Подключение к топикам Apache Kafka
Подключение к Kafka топикам — это базовый навык Data Engineer. Есть несколько способов в зависимости от языка, потребностей и архитектуры.
1. Использование Python (kafka-python)
kafka-python — наиболее популярная библиотека для работы с Kafka в Python:
from kafka import KafkaConsumer, KafkaProducer
import json
# Подключение как консьюмер
consumer = KafkaConsumer(
'orders', # Топик
'payments', # Может слушать несколько топиков
bootstrap_servers=['kafka1:9092', 'kafka2:9092'], # Cluster nodes
group_id='analytics_group', # Группа для offset management
auto_offset_reset='earliest', # С начала, если нет saved offset
enable_auto_commit=True, # Auto-commit offsets
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
key_deserializer=lambda x: x.decode('utf-8') if x else None
)
# Обработка сообщений
for message in consumer:
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Key: {message.key}")
print(f"Value: {message.value}")
print(f"Timestamp: {message.timestamp}")
2. Подключение как продюсер
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
acks='all', # Ждём подтверждения от всех реплик
retries=3,
compression_type='snappy'
)
# Отправка сообщения
order = {
'order_id': '12345',
'user_id': 'user_001',
'amount': 99.99,
'timestamp': '2024-01-15T10:30:00Z'
}
future = producer.send('orders', value=order, key='order_id')
# Ждём отправки
try:
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
except Exception as e:
print(f"Error: {e}")
producer.flush() # Отправить оставшиеся сообщения
producer.close()
3. Подключение через Docker
Частый сценарий — Kafka запущена в Docker:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# Подключение из контейнера (service name как хост)
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka:29092'] # Используем service name
)
4. Подключение с аутентификацией (SASL/SSL)
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import ssl
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka.example.com:9092'],
# SASL аутентификация
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='username',
sasl_plain_password='password',
# SSL сертификаты
ssl_cafile='/path/to/ca-cert',
ssl_certfile='/path/to/client-cert',
ssl_keyfile='/path/to/client-key',
ssl_check_hostname=True,
group_id='secure_group'
)
5. Обработка ошибок при подключении
from kafka.errors import KafkaError
import time
def robust_consumer(topic, bootstrap_servers, max_retries=5):
"""Консьюмер с обработкой ошибок и переподключением"""
retries = 0
while retries < max_retries:
try:
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id='robust_group',
auto_offset_reset='earliest',
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
for message in consumer:
try:
process_message(message)
except Exception as e:
print(f"Error processing message: {e}")
# Можем логировать в dead letter queue
send_to_dlq(message)
except KafkaError as e:
print(f"Kafka error: {e}")
retries += 1
wait_time = 2 ** retries # Exponential backoff
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
except Exception as e:
print(f"Unexpected error: {e}")
raise
raise Exception(f"Failed to connect after {max_retries} retries")
6. Работа с Command Line (kafkacat)
Когда нужно быстро проверить данные в топике:
# Установка
brew install kafkacat # macOS
sudo apt install kafkacat # Ubuntu
# Чтение из топика
kafkacat -b kafka:9092 -t orders -C
# Чтение с начала
kafkacat -b kafka:9092 -t orders -C -o 0
# Чтение последних 10 сообщений
kafkacat -b kafka:9092 -t orders -C -o -10
# Форматирование JSON
kafkacat -b kafka:9092 -t orders -C -f 'Topic: %t, Partition: %p, Offset: %o, Value: %s\n'
# Отправка сообщения
echo '{"order_id": "12345"}' | kafkacat -b kafka:9092 -t orders -P
7. Использование Confluent Kafka Client
from confluent_kafka import Consumer, Producer
import json
# Более продвинутый и быстрый клиент
# Консьюмер
conf = {
'bootstrap.servers': 'kafka1:9092,kafka2:9092',
'group.id': 'analytics',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
consumer = Consumer(conf)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received: {msg.value().decode('utf-8')}")
finally:
consumer.close()
# Продюсер
conf = {
'bootstrap.servers': 'kafka1:9092,kafka2:9092',
'acks': 'all'
}
producer = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()}')
order = {'order_id': '12345', 'amount': 99.99}
producer.produce('orders',
value=json.dumps(order).encode('utf-8'),
callback=delivery_report)
producer.flush()
producer.close()
8. Практический пример: ETL пайплайн
from kafka import KafkaConsumer, KafkaProducer
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class KafkaETL:
"""ETL обработчик на Kafka"""
def __init__(self, source_topic, dest_topic, bootstrap_servers):
self.source_topic = source_topic
self.dest_topic = dest_topic
# Консьюмер из source
self.consumer = KafkaConsumer(
source_topic,
bootstrap_servers=bootstrap_servers,
group_id='etl_group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Продюсер в destination
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def transform(self, message):
"""Бизнес-логика трансформации"""
data = message.value
# Пример трансформации
return {
'original_id': data.get('id'),
'amount_usd': data.get('amount') * 0.85, # Конвертация
'processed_at': str(datetime.now()),
'source': message.topic
}
def run(self):
"""Запуск ETL"""
for message in self.consumer:
try:
transformed = self.transform(message)
self.producer.send(self.dest_topic, value=transformed)
logger.info(f"Processed message from offset {message.offset}")
except Exception as e:
logger.error(f"Error: {e}")
# Dead letter queue
self.producer.send('dlq_topic', value=message.value)
def close(self):
self.consumer.close()
self.producer.close()
# Использование
if __name__ == '__main__':
etl = KafkaETL(
source_topic='raw_orders',
dest_topic='processed_orders',
bootstrap_servers=['kafka:9092']
)
try:
etl.run()
finally:
etl.close()
Пошаговый чеклист подключения
-
Проверить доступность Kafka кластера
telnet kafka:9092 -
Убедиться, что топик существует
kafka-topics --bootstrap-server kafka:9092 --list -
Установить нужную библиотеку
pip install kafka-python # или pip install confluent-kafka -
Написать простой тест подключения
from kafka import KafkaConsumer consumer = KafkaConsumer( bootstrap_servers=['kafka:9092'], auto_offset_reset='earliest' ) consumer.subscribe(['test_topic']) for msg in consumer: print(msg.value) break # Выход после первого сообщения -
Добавить обработку ошибок и мониторинг
-
Настроить логирование и алерты