← Назад к вопросам

В каком стиле писал код на Python и какие технологии использовал

1.0 Junior🔥 251 комментариев
#Python

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Python код, стиль и технологии в моей работе

За 10+ лет я эволюционировал от грязного скрипта-spaghetti к production-grade коду. Вот мой подход и инструменты.

1. Стиль кодирования

От беспорядка к чистоте

# ДО (2010-е годы) — спагетти
def load_data():
    import pandas
    import sqlalchemy
    # Всё в одной функции
    df = pandas.read_csv('data.csv')
    # Нет обработки ошибок
    conn = sqlalchemy.create_engine('postgresql://...')
    for idx, row in df.iterrows():
        # Медленно
        conn.execute(f"INSERT INTO table VALUES ({row})")
    # Нет логирования
    return True

# СЕЙЧАС — clean code
from dataclasses import dataclass
from typing import Iterator
import logging

logger = logging.getLogger(__name__)

@dataclass
class DataLoadConfig:
    """Configuration for data loading"""
    source_path: str
    target_schema: str
    batch_size: int = 5000
    validate: bool = True

class DataLoader:
    """Responsible for loading data from CSV to database"""
    
    def __init__(self, config: DataLoadConfig):
        self.config = config
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def load(self) -> dict[str, int]:
        """Load data with proper error handling and monitoring"""
        try:
            rows_loaded = 0
            rows_failed = 0
            
            for batch in self._read_batches():
                try:
                    if self.config.validate:
                        self._validate_batch(batch)
                    
                    self._insert_batch(batch)
                    rows_loaded += len(batch)
                    
                except ValueError as e:
                    rows_failed += len(batch)
                    self.logger.warning(f'Batch validation failed: {e}')
                    # Optionally: save to error table
                    self._save_to_error_table(batch, str(e))
            
            self.logger.info(f'Load completed: {rows_loaded} rows loaded, {rows_failed} failed')
            return {'loaded': rows_loaded, 'failed': rows_failed}
            
        except Exception as e:
            self.logger.error(f'Critical error during load: {e}', exc_info=True)
            raise
    
    def _read_batches(self) -> Iterator[list[dict]]:
        """Read CSV in batches to avoid memory overload"""
        import pandas as pd
        
        for chunk in pd.read_csv(
            self.config.source_path,
            chunksize=self.config.batch_size,
            dtype=str  # Avoid type inference issues
        ):
            yield chunk.to_dict('records')
    
    def _validate_batch(self, batch: list[dict]) -> None:
        """Validate batch before insertion"""
        for record in batch:
            if not record.get('id') or not record.get('name'):
                raise ValueError(f'Missing required fields in record: {record}')
    
    def _insert_batch(self, batch: list[dict]) -> None:
        """Insert batch using bulk operation"""
        from sqlalchemy import text
        from .db import get_connection
        
        with get_connection() as conn:
            # Используем bulk insert, не row-by-row
            conn.execute(
                text(f"""
                    INSERT INTO {self.config.target_schema}.target_table 
                    (id, name, data) 
                    VALUES (:id, :name, :data)
                """),
                batch
            )
            conn.commit()

2. Архитектура и паттерны

Слойная архитектура (Clean Architecture)

# project_root/
# ├── src/
# │   ├── domain/              # Business logic
# │   │   ├── models.py        # Entities, Value Objects
# │   │   └── repositories.py  # Interfaces
# │   ├── application/         # Use cases, orchestration
# │   │   ├── services.py      # Business services
# │   │   └── use_cases.py     # User stories
# │   ├── infrastructure/      # External integrations
# │   │   ├── database.py      # DB implementations
# │   │   ├── api_client.py    # External APIs
# │   │   └── cache.py         # Caching layer
# │   └── presentation/        # CLI, HTTP, etc
# │       └── cli.py           # Command-line interface
# └── tests/

# Domain Layer (бизнес-логика, не зависит от деталей)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from enum import Enum

class CustomerStatus(str, Enum):
    ACTIVE = 'active'
    INACTIVE = 'inactive'
    CHURNED = 'churned'

@dataclass
class Customer:
    """Domain Entity — представляет бизнес-концепт"""
    id: str
    name: str
    email: str
    status: CustomerStatus
    created_at: datetime
    
    def is_active(self) -> bool:
        return self.status == CustomerStatus.ACTIVE
    
    def churn(self) -> None:
        """Business rule: activate churning"""
        self.status = CustomerStatus.CHURNED

class CustomerRepository(ABC):
    """Repository interface — как БД работает (не важно)"""
    
    @abstractmethod
    def get_by_id(self, customer_id: str) -> Customer:
        pass
    
    @abstractmethod
    def save(self, customer: Customer) -> None:
        pass

# Application Layer (use cases)
class ChurnCustomerUseCase:
    """Отмечает клиента как churned"""
    
    def __init__(self, repository: CustomerRepository):
        self.repository = repository
    
    def execute(self, customer_id: str) -> dict:
        customer = self.repository.get_by_id(customer_id)
        customer.churn()
        self.repository.save(customer)
        return {'status': 'churned', 'customer_id': customer_id}

# Infrastructure Layer (детали реализации)
from sqlalchemy import Column, String, Enum, DateTime
from sqlalchemy.orm import Session

class CustomerRepositorySQL(CustomerRepository):
    """PostgreSQL implementation of repository"""
    
    def __init__(self, session: Session):
        self.session = session
    
    def get_by_id(self, customer_id: str) -> Customer:
        row = self.session.query(CustomerModel).filter(
            CustomerModel.id == customer_id
        ).first()
        
        if not row:
            raise ValueError(f'Customer {customer_id} not found')
        
        return Customer(
            id=row.id,
            name=row.name,
            email=row.email,
            status=CustomerStatus(row.status),
            created_at=row.created_at
        )
    
    def save(self, customer: Customer) -> None:
        self.session.query(CustomerModel).filter(
            CustomerModel.id == customer.id
        ).update({'status': customer.status.value})
        self.session.commit()

3. Конкретные технологии

Data Processing

# Pandas для малых/средних данных
import pandas as pd
import numpy as np

df = pd.read_csv('data.csv')
df['date'] = pd.to_datetime(df['date'])
df_grouped = df.groupby('category')['amount'].sum()

# PySpark для больших данных (> 5GB)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum

spark = SparkSession.builder.appName('DataAnalysis').getOrCreate()
df_spark = spark.read.parquet('s3://bucket/data.parquet')
result = df_spark.groupBy('category').agg(spark_sum('amount')).collect()

# Polars для super fast processing (моя текущая choice для ETL)
import polars as pl

df_polars = pl.read_csv('large_file.csv')
result = df_polars.select('amount').filter(
    pl.col('date') > '2024-01-01'
).groupby('category').agg(pl.sum('amount'))

Database Access

# SQLAlchemy ORM (для простых операций)
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import declarative_base, Session

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

engine = create_engine('postgresql://user:pass@localhost/db')
Base.metadata.create_all(engine)

with Session(engine) as session:
    user = session.query(User).filter(User.id == 1).first()

# Raw SQL (для сложных запросов)
from sqlalchemy import text

with Session(engine) as session:
    result = session.execute(text("""
        SELECT u.name, COUNT(o.id) as order_count
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
        WHERE u.created_at > :cutoff_date
        GROUP BY u.id
    """), {'cutoff_date': '2024-01-01'})
    
    for row in result:
        print(f"{row.name}: {row.order_count} orders")

# SQLAlchemy Core (для ETL operations)
from sqlalchemy import insert, update, delete, select

# Bulk insert (быстро)
insert_stmt = insert(User).values([
    {'name': 'Alice', 'email': 'alice@example.com'},
    {'name': 'Bob', 'email': 'bob@example.com'},
])
session.execute(insert_stmt)

Async/Concurrent Processing

# asyncio для async operations
import asyncio
import aiohttp

async def fetch_data_async(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

# multiprocessing для CPU-bound tasks
from multiprocessing import Pool
from functools import partial

def expensive_calculation(data, factor=2):
    return sum(data) * factor

with Pool(4) as pool:
    results = pool.map(
        partial(expensive_calculation, factor=3),
        [chunk1, chunk2, chunk3, chunk4]
    )

# concurrent.futures для mixed work
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(api_call, item) for item in items]
    results = [f.result() for f in futures]

Testing

# pytest для всех тестов
import pytest
from unittest.mock import patch, MagicMock

class TestChurnCustomerUseCase:
    @pytest.fixture
    def mock_repo(self):
        return MagicMock(spec=CustomerRepository)
    
    def test_churn_sets_status(self, mock_repo):
        # Arrange
        customer = Customer(
            id='123',
            name='John',
            email='john@example.com',
            status=CustomerStatus.ACTIVE,
            created_at=datetime.now()
        )
        mock_repo.get_by_id.return_value = customer
        
        use_case = ChurnCustomerUseCase(mock_repo)
        
        # Act
        result = use_case.execute('123')
        
        # Assert
        assert result['status'] == 'churned'
        mock_repo.save.assert_called_once()
        assert customer.status == CustomerStatus.CHURNED
    
    def test_churn_raises_on_not_found(self, mock_repo):
        mock_repo.get_by_id.side_effect = ValueError('Not found')
        use_case = ChurnCustomerUseCase(mock_repo)
        
        with pytest.raises(ValueError):
            use_case.execute('999')

# VCR.py для HTTP mocking
import vcr

@vcr.use_cassette('fixtures/vcr_cassettes/api_call.yaml')
def test_external_api_call():
    client = APIClient('https://api.example.com')
    response = client.get_data('resource')
    assert response['status'] == 'success'

Logging и Monitoring

import logging
import sys
from pythonjsonlogger import jsonlogger

# JSON logging для production
logger = logging.getLogger()
logHandler = logging.StreamHandler(sys.stdout)
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)

logger.info('Data load started', extra={
    'rows': 1000,
    'source': 'salesforce',
    'duration_sec': 42
})

# Structured logging с контекстом
from structlog import get_logger

log = get_logger()
log.msg('event_occurred', user_id='123', action='purchase', amount=99.99)

Конфигурация

# Pydantic для settings
from pydantic import BaseSettings, validator
from typing import Optional

class Settings(BaseSettings):
    database_url: str
    api_key: str
    debug: bool = False
    batch_size: int = 5000
    
    @validator('batch_size')
    def batch_size_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError('batch_size must be > 0')
        return v
    
    class Config:
        env_file = '.env'
        case_sensitive = False

settings = Settings()

Инструменты разработки

# Code quality
black .                    # Auto-format code
flake8 src/               # Linting
mypy src/                 # Type checking
pylint src/               # Advanced linting

# Testing
pytest tests/ -v          # Run tests
pytest --cov=src          # Coverage report

# Pre-commit hooks
pre-commit install        # Enforce code quality

# Documentation
sphinx-build docs docs/_build
pydoc -w src/domain       # Generate HTML docs

# Profiling
line_profiler script.py    # Line-by-line profiling
memory_profiler -m script.py  # Memory usage

Эволюция за годы

ПериодStyleFrameworkPython VersionKey Tools
2010-2013ProceduralNone2.7Pandas
2013-2017OOPDjango3.4-3.5SQLAlchemy
2017-2020Clean CodeFastAPI3.6-3.8Pydantic
2020-2024Domain DrivenAsync3.8+pytest, polars

Мой current stack для Data Engineering:

  • Языки: Python 3.11+
  • ETL: Polars, PySpark
  • DB: SQLAlchemy + Raw SQL
  • API: FastAPI (для pipelines)
  • Testing: pytest + VCR.py
  • Monitoring: structlog + Prometheus
  • Orchestration: Airflow или Dagster
  • IaC: Terraform

Я выбираю простоту и clarity над cleverness. Код пишу так, чтобы junior engineer через неделю понял, что там происходит.

В каком стиле писал код на Python и какие технологии использовал | PrepBro