← Назад к вопросам
В каком стиле писал код на Python и какие технологии использовал
1.0 Junior🔥 251 комментариев
#Python
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Python код, стиль и технологии в моей работе
За 10+ лет я эволюционировал от грязного скрипта-spaghetti к production-grade коду. Вот мой подход и инструменты.
1. Стиль кодирования
От беспорядка к чистоте
# ДО (2010-е годы) — спагетти
def load_data():
import pandas
import sqlalchemy
# Всё в одной функции
df = pandas.read_csv('data.csv')
# Нет обработки ошибок
conn = sqlalchemy.create_engine('postgresql://...')
for idx, row in df.iterrows():
# Медленно
conn.execute(f"INSERT INTO table VALUES ({row})")
# Нет логирования
return True
# СЕЙЧАС — clean code
from dataclasses import dataclass
from typing import Iterator
import logging
logger = logging.getLogger(__name__)
@dataclass
class DataLoadConfig:
"""Configuration for data loading"""
source_path: str
target_schema: str
batch_size: int = 5000
validate: bool = True
class DataLoader:
"""Responsible for loading data from CSV to database"""
def __init__(self, config: DataLoadConfig):
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
def load(self) -> dict[str, int]:
"""Load data with proper error handling and monitoring"""
try:
rows_loaded = 0
rows_failed = 0
for batch in self._read_batches():
try:
if self.config.validate:
self._validate_batch(batch)
self._insert_batch(batch)
rows_loaded += len(batch)
except ValueError as e:
rows_failed += len(batch)
self.logger.warning(f'Batch validation failed: {e}')
# Optionally: save to error table
self._save_to_error_table(batch, str(e))
self.logger.info(f'Load completed: {rows_loaded} rows loaded, {rows_failed} failed')
return {'loaded': rows_loaded, 'failed': rows_failed}
except Exception as e:
self.logger.error(f'Critical error during load: {e}', exc_info=True)
raise
def _read_batches(self) -> Iterator[list[dict]]:
"""Read CSV in batches to avoid memory overload"""
import pandas as pd
for chunk in pd.read_csv(
self.config.source_path,
chunksize=self.config.batch_size,
dtype=str # Avoid type inference issues
):
yield chunk.to_dict('records')
def _validate_batch(self, batch: list[dict]) -> None:
"""Validate batch before insertion"""
for record in batch:
if not record.get('id') or not record.get('name'):
raise ValueError(f'Missing required fields in record: {record}')
def _insert_batch(self, batch: list[dict]) -> None:
"""Insert batch using bulk operation"""
from sqlalchemy import text
from .db import get_connection
with get_connection() as conn:
# Используем bulk insert, не row-by-row
conn.execute(
text(f"""
INSERT INTO {self.config.target_schema}.target_table
(id, name, data)
VALUES (:id, :name, :data)
"""),
batch
)
conn.commit()
2. Архитектура и паттерны
Слойная архитектура (Clean Architecture)
# project_root/
# ├── src/
# │ ├── domain/ # Business logic
# │ │ ├── models.py # Entities, Value Objects
# │ │ └── repositories.py # Interfaces
# │ ├── application/ # Use cases, orchestration
# │ │ ├── services.py # Business services
# │ │ └── use_cases.py # User stories
# │ ├── infrastructure/ # External integrations
# │ │ ├── database.py # DB implementations
# │ │ ├── api_client.py # External APIs
# │ │ └── cache.py # Caching layer
# │ └── presentation/ # CLI, HTTP, etc
# │ └── cli.py # Command-line interface
# └── tests/
# Domain Layer (бизнес-логика, не зависит от деталей)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class CustomerStatus(str, Enum):
ACTIVE = 'active'
INACTIVE = 'inactive'
CHURNED = 'churned'
@dataclass
class Customer:
"""Domain Entity — представляет бизнес-концепт"""
id: str
name: str
email: str
status: CustomerStatus
created_at: datetime
def is_active(self) -> bool:
return self.status == CustomerStatus.ACTIVE
def churn(self) -> None:
"""Business rule: activate churning"""
self.status = CustomerStatus.CHURNED
class CustomerRepository(ABC):
"""Repository interface — как БД работает (не важно)"""
@abstractmethod
def get_by_id(self, customer_id: str) -> Customer:
pass
@abstractmethod
def save(self, customer: Customer) -> None:
pass
# Application Layer (use cases)
class ChurnCustomerUseCase:
"""Отмечает клиента как churned"""
def __init__(self, repository: CustomerRepository):
self.repository = repository
def execute(self, customer_id: str) -> dict:
customer = self.repository.get_by_id(customer_id)
customer.churn()
self.repository.save(customer)
return {'status': 'churned', 'customer_id': customer_id}
# Infrastructure Layer (детали реализации)
from sqlalchemy import Column, String, Enum, DateTime
from sqlalchemy.orm import Session
class CustomerRepositorySQL(CustomerRepository):
"""PostgreSQL implementation of repository"""
def __init__(self, session: Session):
self.session = session
def get_by_id(self, customer_id: str) -> Customer:
row = self.session.query(CustomerModel).filter(
CustomerModel.id == customer_id
).first()
if not row:
raise ValueError(f'Customer {customer_id} not found')
return Customer(
id=row.id,
name=row.name,
email=row.email,
status=CustomerStatus(row.status),
created_at=row.created_at
)
def save(self, customer: Customer) -> None:
self.session.query(CustomerModel).filter(
CustomerModel.id == customer.id
).update({'status': customer.status.value})
self.session.commit()
3. Конкретные технологии
Data Processing
# Pandas для малых/средних данных
import pandas as pd
import numpy as np
df = pd.read_csv('data.csv')
df['date'] = pd.to_datetime(df['date'])
df_grouped = df.groupby('category')['amount'].sum()
# PySpark для больших данных (> 5GB)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
spark = SparkSession.builder.appName('DataAnalysis').getOrCreate()
df_spark = spark.read.parquet('s3://bucket/data.parquet')
result = df_spark.groupBy('category').agg(spark_sum('amount')).collect()
# Polars для super fast processing (моя текущая choice для ETL)
import polars as pl
df_polars = pl.read_csv('large_file.csv')
result = df_polars.select('amount').filter(
pl.col('date') > '2024-01-01'
).groupby('category').agg(pl.sum('amount'))
Database Access
# SQLAlchemy ORM (для простых операций)
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import declarative_base, Session
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
engine = create_engine('postgresql://user:pass@localhost/db')
Base.metadata.create_all(engine)
with Session(engine) as session:
user = session.query(User).filter(User.id == 1).first()
# Raw SQL (для сложных запросов)
from sqlalchemy import text
with Session(engine) as session:
result = session.execute(text("""
SELECT u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > :cutoff_date
GROUP BY u.id
"""), {'cutoff_date': '2024-01-01'})
for row in result:
print(f"{row.name}: {row.order_count} orders")
# SQLAlchemy Core (для ETL operations)
from sqlalchemy import insert, update, delete, select
# Bulk insert (быстро)
insert_stmt = insert(User).values([
{'name': 'Alice', 'email': 'alice@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'},
])
session.execute(insert_stmt)
Async/Concurrent Processing
# asyncio для async operations
import asyncio
import aiohttp
async def fetch_data_async(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
# multiprocessing для CPU-bound tasks
from multiprocessing import Pool
from functools import partial
def expensive_calculation(data, factor=2):
return sum(data) * factor
with Pool(4) as pool:
results = pool.map(
partial(expensive_calculation, factor=3),
[chunk1, chunk2, chunk3, chunk4]
)
# concurrent.futures для mixed work
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(api_call, item) for item in items]
results = [f.result() for f in futures]
Testing
# pytest для всех тестов
import pytest
from unittest.mock import patch, MagicMock
class TestChurnCustomerUseCase:
@pytest.fixture
def mock_repo(self):
return MagicMock(spec=CustomerRepository)
def test_churn_sets_status(self, mock_repo):
# Arrange
customer = Customer(
id='123',
name='John',
email='john@example.com',
status=CustomerStatus.ACTIVE,
created_at=datetime.now()
)
mock_repo.get_by_id.return_value = customer
use_case = ChurnCustomerUseCase(mock_repo)
# Act
result = use_case.execute('123')
# Assert
assert result['status'] == 'churned'
mock_repo.save.assert_called_once()
assert customer.status == CustomerStatus.CHURNED
def test_churn_raises_on_not_found(self, mock_repo):
mock_repo.get_by_id.side_effect = ValueError('Not found')
use_case = ChurnCustomerUseCase(mock_repo)
with pytest.raises(ValueError):
use_case.execute('999')
# VCR.py для HTTP mocking
import vcr
@vcr.use_cassette('fixtures/vcr_cassettes/api_call.yaml')
def test_external_api_call():
client = APIClient('https://api.example.com')
response = client.get_data('resource')
assert response['status'] == 'success'
Logging и Monitoring
import logging
import sys
from pythonjsonlogger import jsonlogger
# JSON logging для production
logger = logging.getLogger()
logHandler = logging.StreamHandler(sys.stdout)
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
logger.info('Data load started', extra={
'rows': 1000,
'source': 'salesforce',
'duration_sec': 42
})
# Structured logging с контекстом
from structlog import get_logger
log = get_logger()
log.msg('event_occurred', user_id='123', action='purchase', amount=99.99)
Конфигурация
# Pydantic для settings
from pydantic import BaseSettings, validator
from typing import Optional
class Settings(BaseSettings):
database_url: str
api_key: str
debug: bool = False
batch_size: int = 5000
@validator('batch_size')
def batch_size_must_be_positive(cls, v):
if v <= 0:
raise ValueError('batch_size must be > 0')
return v
class Config:
env_file = '.env'
case_sensitive = False
settings = Settings()
Инструменты разработки
# Code quality
black . # Auto-format code
flake8 src/ # Linting
mypy src/ # Type checking
pylint src/ # Advanced linting
# Testing
pytest tests/ -v # Run tests
pytest --cov=src # Coverage report
# Pre-commit hooks
pre-commit install # Enforce code quality
# Documentation
sphinx-build docs docs/_build
pydoc -w src/domain # Generate HTML docs
# Profiling
line_profiler script.py # Line-by-line profiling
memory_profiler -m script.py # Memory usage
Эволюция за годы
| Период | Style | Framework | Python Version | Key Tools |
|---|---|---|---|---|
| 2010-2013 | Procedural | None | 2.7 | Pandas |
| 2013-2017 | OOP | Django | 3.4-3.5 | SQLAlchemy |
| 2017-2020 | Clean Code | FastAPI | 3.6-3.8 | Pydantic |
| 2020-2024 | Domain Driven | Async | 3.8+ | pytest, polars |
Мой current stack для Data Engineering:
- Языки: Python 3.11+
- ETL: Polars, PySpark
- DB: SQLAlchemy + Raw SQL
- API: FastAPI (для pipelines)
- Testing: pytest + VCR.py
- Monitoring: structlog + Prometheus
- Orchestration: Airflow или Dagster
- IaC: Terraform
Я выбираю простоту и clarity над cleverness. Код пишу так, чтобы junior engineer через неделю понял, что там происходит.