Реализовывал ли свой проект на Python
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Опыт разработки на Python
Да, я реализовал несколько проектов на Python, хотя Java остаётся моим основным языком. Этот опыт дал мне ценные знания о многоязычной разработке и умение выбирать правильный инструмент для решения конкретной задачи.
Проект 1: Data Pipeline для обработки логов
Контекст: На прошлой работе нам требовалась система обработки и анализа логов с нескольких микросервисов. Java был бы излишне тяжёлым для этой задачи.
Стек:
- Python 3.9 + FastAPI для REST API
- Pandas для обработки данных
- Elasticsearch для хранения и индексирования
- Apache Airflow для оркестрации pipeline
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
import logging
from elasticsearch import Elasticsearch
app = FastAPI()
es = Elasticsearch(["localhost:9200"])
class LogEntry(BaseModel):
service: str
level: str # INFO, WARN, ERROR
message: str
timestamp: datetime
@app.post("/api/v1/logs")
async def ingest_log(log: LogEntry):
"""Ingestion endpoint для логов из микросервисов"""
doc = {
"service": log.service,
"level": log.level,
"message": log.message,
"timestamp": log.timestamp,
"indexed_at": datetime.utcnow()
}
result = es.index(
index=f"logs-{log.service}",
document=doc
)
return {"status": "indexed", "id": result["_id"]}
@app.get("/api/v1/logs/errors/{service}")
async def get_service_errors(service: str, hours: int = 24):
"""Получить ошибки сервиса за последние N часов"""
query = {
"bool": {
"must": [
{"match": {"service": service}},
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": f"now-{hours}h"}}}
]
}
}
result = es.search(index=f"logs-{service}", query=query, size=1000)
return {
"total": result["hits"]["total"]["value"],
"errors": [hit["_source"] for hit in result["hits"]["hits"]]
}
Apache Airflow для автоматизации анализа:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from elasticsearch import Elasticsearch
def analyze_error_trends():
"""Анализ тенденций ошибок за день"""
es = Elasticsearch(["localhost:9200"])
# Получить все ошибки за день
query = {
"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "now-24h"}}}
]
}
}
result = es.search(index="logs-*", query=query, size=10000)
# Преобразовать в DataFrame
logs = [hit["_source"] for hit in result["hits"]["hits"]]
df = pd.DataFrame(logs)
# Анализ
errors_by_service = df.groupby("service").size().sort_values(ascending=False)
print(f"Ошибки по сервисам:\n{errors_by_service}")
# Сохранить отчёт
df.to_csv("/tmp/error_report.csv", index=False)
return errors_by_service.to_dict()
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2024, 1, 1),
}
dag = DAG(
"error_analysis_dag",
default_args=default_args,
schedule_interval="0 2 * * *", # В 2:00 ночи каждый день
catchup=False,
)
analyze_task = PythonOperator(
task_id="analyze_errors",
python_callable=analyze_error_trends,
dag=dag,
)
Проект 2: ML модель для рекомендаций
Контекст: Нужно было добавить систему рекомендаций товаров в e-commerce платформу.
Стек:
- Python 3.10
- scikit-learn + pandas для ML
- Flask для serving модели
- Docker для контейнеризации
from flask import Flask, request, jsonify
import joblib
import pandas as pd
from sklearn.preprocessing import StandardScaler
import logging
app = Flask(__name__)
logger = logging.getLogger(__name__)
# Загрузить обученную модель
model = joblib.load("/models/recommendation_model.pkl")
scaler = joblib.load("/models/scaler.pkl")
class RecommendationModel:
def __init__(self, model, scaler):
self.model = model
self.scaler = scaler
def predict(self, user_features: dict) -> list:
"""Предсказать рекомендации для пользователя"""
# Подготовить признаки
df = pd.DataFrame([user_features])
scaled_features = self.scaler.transform(df)
# Получить предсказание (scores товаров)
scores = self.model.predict(scaled_features)[0]
# Вернуть top-10 товаров
top_10_indices = scores.argsort()[-10:][::-1]
return top_10_indices.tolist()
recommender = RecommendationModel(model, scaler)
@app.post("/api/v1/recommend")
def recommend():
"""API для получения рекомендаций"""
try:
user_data = request.json
# Валидация
required_fields = ["user_id", "purchase_history", "browsing_history"]
if not all(field in user_data for field in required_fields):
return jsonify({"error": "Missing required fields"}), 400
# Подготовить признаки
features = {
"total_purchases": len(user_data["purchase_history"]),
"avg_price": sum(user_data["purchase_history"]) / len(user_data["purchase_history"]),
"browsing_count": len(user_data["browsing_history"]),
"user_score": user_data.get("score", 0),
}
# Получить рекомендации
recommendations = recommender.predict(features)
logger.info(f"Generated {len(recommendations)} recommendations for user {user_data[user_id]}")
return jsonify({
"user_id": user_data["user_id"],
"recommendations": recommendations,
"confidence": 0.92
})
except Exception as e:
logger.error(f"Error generating recommendations: {str(e)}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)
Проект 3: CLI инструмент для миграции данных
Контекст: Нужно было мигрировать данные из старой БД в новую во время перехода на микросервисы.
import click
import psycopg2
from psycopg2.extras import RealDictCursor
import logging
from tqdm import tqdm
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataMigration:
def __init__(self, source_dsn: str, target_dsn: str):
self.source_dsn = source_dsn
self.target_dsn = target_dsn
def migrate_users(self):
"""Мигрировать пользователей из монолита в Users Service"""
source_conn = psycopg2.connect(self.source_dsn)
target_conn = psycopg2.connect(self.target_dsn)
try:
with source_conn.cursor(cursor_factory=RealDictCursor) as source_cursor:
source_cursor.execute("SELECT id, email, name, created_at FROM users")
users = source_cursor.fetchall()
logger.info(f"Found {len(users)} users to migrate")
with target_conn.cursor() as target_cursor:
for user in tqdm(users, desc="Migrating users"):
target_cursor.execute(
"""INSERT INTO users (id, email, name, created_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO NOTHING""",
(user["id"], user["email"], user["name"], user["created_at"])
)
target_conn.commit()
logger.info(f"Successfully migrated {len(users)} users")
except Exception as e:
target_conn.rollback()
logger.error(f"Migration failed: {str(e)}")
raise
finally:
source_conn.close()
target_conn.close()
@click.group()
def cli():
"""Data Migration Tool"""
pass
@cli.command()
@click.option("--source", required=True, help="Source database DSN")
@click.option("--target", required=True, help="Target database DSN")
def migrate(source, target):
"""Run data migration"""
migration = DataMigration(source, target)
click.echo("Starting data migration...")
migration.migrate_users()
click.echo("Migration completed successfully!")
if __name__ == "__main__":
cli()
Почему я выбираю Python для определённых задач
| Задача | Python | Java |
|---|---|---|
| Data Pipeline | ✓ Отличный | Тяжёлый |
| ML/AI | ✓ Лучший выбор | Сложный |
| DevOps/Automation | ✓ Естественный | Оverkill |
| Real-time система | Медленный | ✓ Предпочтительно |
| REST API | ✓ Хороший | ✓ Хороший |
| Микросервисы | Слабый | ✓ Идеален |
Как это дополняет мой Java опыт
- Полнофункциональный разработчик — я могу выбрать инструмент на основе задачи, а не языка
- Интеграция систем — опыт работы с Python microservices помогает в интеграции с Java сервисами
- Быстрое прототипирование — Python опыт помогает быстро разрабатывать POC до реализации на Java
- Мониторинг и аналитика — Python инструменты (pandas, matplotlib) я использую для анализа метрик Java приложений
Этот опыт демонстрирует мою способность быстро адаптироваться к новым технологиям и выбирать оптимальное решение для каждой задачи.