← Назад к вопросам

Какие Python-фреймворки использовал?

1.0 Junior🔥 241 комментариев
#Python

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Python-фреймворки для Data Engineer

During my career as Data Engineer with 10+ years of experience, I've worked with diverse Python frameworks and libraries. Let me break down the most relevant ones for data engineering tasks.

Web Frameworks (для APIs и сервисов)

FastAPI

Когда я использовал: REST API для ETL pipeline orchestration, data endpoints, webhook handlers

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class DataQuery(BaseModel):
    table: str
    filters: dict

@app.post("/query")
async def execute_query(query: DataQuery):
    try:
        result = db.fetch_data(query.table, query.filters)
        return {"status": "success", "data": result}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

# Достоинства: Автоматическая валидация, OpenAPI docs, async support
# Недостатки: Требует async/await для full performance

Flask

Когда я использовал: Простые микросервисы, webhook handlers, quick prototypes

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/api/sync', methods=['POST'])
def sync_data():
    data = request.json
    result = sync_pipeline(data['source'], data['target'])
    return jsonify({"status": "success", "rows": result})

# Достоинства: Простой, лёгкий, быстро на ногах
# Недостатки: Синхронный, не годится для высокой нагрузки

Django

Когда я использовал: Крупные проекты с ORM, админ-панель, управление данными

from django.db import models
from django.core.management.base import BaseCommand

class DataSource(models.Model):
    name = models.CharField(max_length=255)
    connection_string = models.CharField(max_length=1000)
    last_sync = models.DateTimeField(null=True)

class Command(BaseCommand):
    def handle(self, *args, **options):
        sources = DataSource.objects.filter(active=True)
        for source in sources:
            self.sync_source(source)

# Достоинства: ORM, миграции, админ, полнофункциональный фреймворк
# Недостатки: Тяжёлый, slow для простых задач

Data Processing (основной stack)

PySpark

Когда я использовал: Обработка больших данных (10GB+), distributed computing

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, col

spark = SparkSession.builder.appName("DataPipeline").getOrCreate()

# Читаем большой файл
df = spark.read.parquet("s3://bucket/data/events/*.parquet")

# Трансформации
result = df.filter(col("date") >= "2024-01-01") \
    .groupBy("user_id") \
    .agg(spark_sum("purchase_amount").alias("total_spent"))

result.write.mode("overwrite").parquet("s3://bucket/output/results")

# Достоинства: Масштабируемость, SQL возможности, distributed
# Недостатки: Overhead, медленен для малых данных, сложная отладка

Pandas

Когда я использовал: Работа с таблицами < 5GB, data cleaning, exploratory analysis

import pandas as pd
import numpy as np

# Чтение данных
df = pd.read_csv("sales.csv", parse_dates=['date'])

# Cleaning
df = df.dropna(subset=['customer_id'])
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')

# Aggregation
summary = df.groupby('region').agg({
    'amount': ['sum', 'mean', 'std'],
    'customer_id': 'count'
}).round(2)

# Export
summary.to_csv("output/summary.csv")

# Достоинства: Интуитивный, быстро писать, SQL-like groupby
# Недостатки: Весь датасет в памяти, медленен на больших данных

Polars

Когда я использовал: Замена pandas для больших данных, performance-critical операции

import polars as pl

# Чтение
df = pl.read_csv("sales.csv")

# Lazy evaluation для оптимизации
result = (df
    .lazy()
    .filter(pl.col("date") >= "2024-01-01")
    .groupby("region")
    .agg([
        pl.col("amount").sum().alias("total"),
        pl.col("amount").mean().alias("avg")
    ])
    .sort("total", descending=True)
    .collect()  # Выполнить
)

# Достоинства: Быстрее pandas в 5-10 раз, меньше памяти, лучше API
# Недостатки: Молодой, меньше интеграций

Dask

Когда я использовал: Параллельная обработка, работа с данными больше памяти

import dask.dataframe as dd

# Чтение всех парт параллельно
df = dd.read_csv("data/*.csv")

# Трансформации (ленивые)
result = df[df['amount'] > 1000].groupby('region')['amount'].mean()

# Вычисление (параллельно)
result = result.compute()

# Достоинства: Параллелизм, знакомый API как pandas
# Недостатки: Медленнее Spark для очень больших данных

Databases & ORM

SQLAlchemy

Когда я использовал: Работа с SQL БД, ORM queries, миграции

from sqlalchemy import create_engine, Column, String, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    email = Column(String(255), unique=True)
    created_at = Column(DateTime, default=datetime.utcnow)

engine = create_engine('postgresql://user:pass@localhost/db')
Session = sessionmaker(bind=engine)
session = Session()

# Queries
active_users = session.query(User).filter(User.created_at >= '2024-01-01').all()

# Достоинства: Type-safe, миграции через Alembic, relationships
# Недостатки: Медленнее raw SQL, сложен для complex queries

SQLModel

Когда я использовал: FastAPI + БД интеграция с типизацией

from sqlmodel import SQLModel, Field, Session, select
from typing import Optional

class DataSource(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    connection_string: str
    is_active: bool = True

with Session(engine) as session:
    sources = session.exec(select(DataSource).where(DataSource.is_active)).all()

# Достоинства: Одна модель для API и БД, типизация
# Недостатки: Молодой, меньше features, чем SQLAlchemy

psycopg2 (raw SQL)

Когда я использовал: High-performance queries, complex SQL

import psycopg2
from psycopg2.extras import execute_values

conn = psycopg2.connect("dbname=mydb user=user password=pass")
cur = conn.cursor()

# Batch insert
data = [(1, 'Alice'), (2, 'Bob')]
execute_values(cur, 
    "INSERT INTO users (id, name) VALUES %s",
    data)

conn.commit()

# Достоинства: Максимальная скорость, control
# Недостатки: Нет абстракции, SQL injection риск если не осторожен

ETL & Orchestration

Apache Airflow

Когда я использовал: Сложные DAGs (directed acyclic graphs), scheduling

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_eng',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='0 2 * * *')

def extract_data(**context):
    # Fetch data from source
    return {"rows": 1000}

def transform_data(**context):
    ti = context['task_instance']
    rows = ti.xcom_pull(task_ids='extract')
    # Transform
    return {"cleaned_rows": rows - 50}

extract = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)

extract >> transform

# Достоинства: Powerful, множество интеграций, monitoring
# Недостатки: Сложен в setup, overhead для простых задач

Luigi

Когда я использовал: Simplistic pipelines, быстрые скрипты

import luigi

class FetchData(luigi.Task):
    def output(self):
        return luigi.LocalTarget('data.csv')
    
    def run(self):
        data = fetch_from_api()
        with self.output().open('w') as f:
            f.write(data)

class ProcessData(luigi.Task):
    def requires(self):
        return FetchData()
    
    def output(self):
        return luigi.LocalTarget('processed.csv')
    
    def run(self):
        # Read from FetchData output
        with self.input().open('r') as f:
            data = f.read()
        # Process and write

if __name__ == '__main__':
    luigi.build([ProcessData()], local_scheduler=True)

# Достоинства: Простой, интуитивный
# Недостатки: Менее scalable, нет UI

Prefect

Когда я использовал: Modern orchestration, лучше than Airflow UX

from prefect import flow, task

@task
def extract(source: str):
    return fetch_data(source)

@task
def transform(data):
    return clean_and_process(data)

@task
def load(data, target: str):
    save_data(target, data)

@flow
def etl_flow():
    data = extract("database")
    transformed = transform(data)
    load(transformed, "warehouse")

etl_flow()

# Достоинства: Простой синтаксис, хороший UI, параллелизм
# Недостатки: Молодой, меньше интеграций

Data Validation

Great Expectations

Когда я использовал: Data quality checks, expectations validation

from great_expectations.dataset import SparkDFDataset

df_test = SparkDFDataset(spark_df)

df_test.expect_column_values_to_be_in_set(
    column="status",
    value_set=["active", "inactive"]
)

df_test.expect_column_values_to_be_of_type(
    column="amount",
    type_="int64"
)

result = df_test.validate()

# Достоинства: Comprehensive, integrations, easy assertions
# Недостатки: Overhead, steep learning curve

Pydantic

Когда я использовал: Валидация данных, type safety

from pydantic import BaseModel, validator, Field

class Event(BaseModel):
    user_id: int
    event_type: str
    timestamp: datetime
    amount: float = Field(gt=0)  # greater than 0
    
    @validator('event_type')
    def validate_type(cls, v):
        allowed = ['purchase', 'login', 'logout']
        if v not in allowed:
            raise ValueError(f'Invalid: {v}')
        return v

event = Event(
    user_id=123,
    event_type='purchase',
    timestamp=datetime.now(),
    amount=99.99
)

# Достоинства: Строгая типизация, валидация, JSON serialization
# Недостатки: Runtime overhead для большого量 объектов

Summary

Стек, который я использовал чаще всего:

  1. Data Processing: PySpark (big data) + Pandas (small data)
  2. APIs: FastAPI + SQLAlchemy
  3. Orchestration: Airflow (large projects) + Luigi (small tasks)
  4. Validation: Pydantic + Great Expectations
  5. Databases: PostgreSQL + SQLAlchemy ORM

Выбор зависит от:

  • Размера данных (Pandas < 5GB, PySpark > 5GB)
  • Complexity (Luigi для простого, Airflow для complex)
  • Performance требований (raw SQL > ORM)
  • Team expertise (используй то, что знаешь)

Лучше практики:

  • Используй правильный инструмент для работы
  • Не переусложняй, если можно проще
  • Всегда валидируй данные
  • Мониторь и логируй
  • Пиши тесты (pytest)
Какие Python-фреймворки использовал? | PrepBro