Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Python-фреймворки для Data Engineer
During my career as Data Engineer with 10+ years of experience, I've worked with diverse Python frameworks and libraries. Let me break down the most relevant ones for data engineering tasks.
Web Frameworks (для APIs и сервисов)
FastAPI
Когда я использовал: REST API для ETL pipeline orchestration, data endpoints, webhook handlers
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class DataQuery(BaseModel):
table: str
filters: dict
@app.post("/query")
async def execute_query(query: DataQuery):
try:
result = db.fetch_data(query.table, query.filters)
return {"status": "success", "data": result}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# Достоинства: Автоматическая валидация, OpenAPI docs, async support
# Недостатки: Требует async/await для full performance
Flask
Когда я использовал: Простые микросервисы, webhook handlers, quick prototypes
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/api/sync', methods=['POST'])
def sync_data():
data = request.json
result = sync_pipeline(data['source'], data['target'])
return jsonify({"status": "success", "rows": result})
# Достоинства: Простой, лёгкий, быстро на ногах
# Недостатки: Синхронный, не годится для высокой нагрузки
Django
Когда я использовал: Крупные проекты с ORM, админ-панель, управление данными
from django.db import models
from django.core.management.base import BaseCommand
class DataSource(models.Model):
name = models.CharField(max_length=255)
connection_string = models.CharField(max_length=1000)
last_sync = models.DateTimeField(null=True)
class Command(BaseCommand):
def handle(self, *args, **options):
sources = DataSource.objects.filter(active=True)
for source in sources:
self.sync_source(source)
# Достоинства: ORM, миграции, админ, полнофункциональный фреймворк
# Недостатки: Тяжёлый, slow для простых задач
Data Processing (основной stack)
PySpark
Когда я использовал: Обработка больших данных (10GB+), distributed computing
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, col
spark = SparkSession.builder.appName("DataPipeline").getOrCreate()
# Читаем большой файл
df = spark.read.parquet("s3://bucket/data/events/*.parquet")
# Трансформации
result = df.filter(col("date") >= "2024-01-01") \
.groupBy("user_id") \
.agg(spark_sum("purchase_amount").alias("total_spent"))
result.write.mode("overwrite").parquet("s3://bucket/output/results")
# Достоинства: Масштабируемость, SQL возможности, distributed
# Недостатки: Overhead, медленен для малых данных, сложная отладка
Pandas
Когда я использовал: Работа с таблицами < 5GB, data cleaning, exploratory analysis
import pandas as pd
import numpy as np
# Чтение данных
df = pd.read_csv("sales.csv", parse_dates=['date'])
# Cleaning
df = df.dropna(subset=['customer_id'])
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Aggregation
summary = df.groupby('region').agg({
'amount': ['sum', 'mean', 'std'],
'customer_id': 'count'
}).round(2)
# Export
summary.to_csv("output/summary.csv")
# Достоинства: Интуитивный, быстро писать, SQL-like groupby
# Недостатки: Весь датасет в памяти, медленен на больших данных
Polars
Когда я использовал: Замена pandas для больших данных, performance-critical операции
import polars as pl
# Чтение
df = pl.read_csv("sales.csv")
# Lazy evaluation для оптимизации
result = (df
.lazy()
.filter(pl.col("date") >= "2024-01-01")
.groupby("region")
.agg([
pl.col("amount").sum().alias("total"),
pl.col("amount").mean().alias("avg")
])
.sort("total", descending=True)
.collect() # Выполнить
)
# Достоинства: Быстрее pandas в 5-10 раз, меньше памяти, лучше API
# Недостатки: Молодой, меньше интеграций
Dask
Когда я использовал: Параллельная обработка, работа с данными больше памяти
import dask.dataframe as dd
# Чтение всех парт параллельно
df = dd.read_csv("data/*.csv")
# Трансформации (ленивые)
result = df[df['amount'] > 1000].groupby('region')['amount'].mean()
# Вычисление (параллельно)
result = result.compute()
# Достоинства: Параллелизм, знакомый API как pandas
# Недостатки: Медленнее Spark для очень больших данных
Databases & ORM
SQLAlchemy
Когда я использовал: Работа с SQL БД, ORM queries, миграции
from sqlalchemy import create_engine, Column, String, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True)
created_at = Column(DateTime, default=datetime.utcnow)
engine = create_engine('postgresql://user:pass@localhost/db')
Session = sessionmaker(bind=engine)
session = Session()
# Queries
active_users = session.query(User).filter(User.created_at >= '2024-01-01').all()
# Достоинства: Type-safe, миграции через Alembic, relationships
# Недостатки: Медленнее raw SQL, сложен для complex queries
SQLModel
Когда я использовал: FastAPI + БД интеграция с типизацией
from sqlmodel import SQLModel, Field, Session, select
from typing import Optional
class DataSource(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
connection_string: str
is_active: bool = True
with Session(engine) as session:
sources = session.exec(select(DataSource).where(DataSource.is_active)).all()
# Достоинства: Одна модель для API и БД, типизация
# Недостатки: Молодой, меньше features, чем SQLAlchemy
psycopg2 (raw SQL)
Когда я использовал: High-performance queries, complex SQL
import psycopg2
from psycopg2.extras import execute_values
conn = psycopg2.connect("dbname=mydb user=user password=pass")
cur = conn.cursor()
# Batch insert
data = [(1, 'Alice'), (2, 'Bob')]
execute_values(cur,
"INSERT INTO users (id, name) VALUES %s",
data)
conn.commit()
# Достоинства: Максимальная скорость, control
# Недостатки: Нет абстракции, SQL injection риск если не осторожен
ETL & Orchestration
Apache Airflow
Когда я использовал: Сложные DAGs (directed acyclic graphs), scheduling
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_eng',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='0 2 * * *')
def extract_data(**context):
# Fetch data from source
return {"rows": 1000}
def transform_data(**context):
ti = context['task_instance']
rows = ti.xcom_pull(task_ids='extract')
# Transform
return {"cleaned_rows": rows - 50}
extract = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
extract >> transform
# Достоинства: Powerful, множество интеграций, monitoring
# Недостатки: Сложен в setup, overhead для простых задач
Luigi
Когда я использовал: Simplistic pipelines, быстрые скрипты
import luigi
class FetchData(luigi.Task):
def output(self):
return luigi.LocalTarget('data.csv')
def run(self):
data = fetch_from_api()
with self.output().open('w') as f:
f.write(data)
class ProcessData(luigi.Task):
def requires(self):
return FetchData()
def output(self):
return luigi.LocalTarget('processed.csv')
def run(self):
# Read from FetchData output
with self.input().open('r') as f:
data = f.read()
# Process and write
if __name__ == '__main__':
luigi.build([ProcessData()], local_scheduler=True)
# Достоинства: Простой, интуитивный
# Недостатки: Менее scalable, нет UI
Prefect
Когда я использовал: Modern orchestration, лучше than Airflow UX
from prefect import flow, task
@task
def extract(source: str):
return fetch_data(source)
@task
def transform(data):
return clean_and_process(data)
@task
def load(data, target: str):
save_data(target, data)
@flow
def etl_flow():
data = extract("database")
transformed = transform(data)
load(transformed, "warehouse")
etl_flow()
# Достоинства: Простой синтаксис, хороший UI, параллелизм
# Недостатки: Молодой, меньше интеграций
Data Validation
Great Expectations
Когда я использовал: Data quality checks, expectations validation
from great_expectations.dataset import SparkDFDataset
df_test = SparkDFDataset(spark_df)
df_test.expect_column_values_to_be_in_set(
column="status",
value_set=["active", "inactive"]
)
df_test.expect_column_values_to_be_of_type(
column="amount",
type_="int64"
)
result = df_test.validate()
# Достоинства: Comprehensive, integrations, easy assertions
# Недостатки: Overhead, steep learning curve
Pydantic
Когда я использовал: Валидация данных, type safety
from pydantic import BaseModel, validator, Field
class Event(BaseModel):
user_id: int
event_type: str
timestamp: datetime
amount: float = Field(gt=0) # greater than 0
@validator('event_type')
def validate_type(cls, v):
allowed = ['purchase', 'login', 'logout']
if v not in allowed:
raise ValueError(f'Invalid: {v}')
return v
event = Event(
user_id=123,
event_type='purchase',
timestamp=datetime.now(),
amount=99.99
)
# Достоинства: Строгая типизация, валидация, JSON serialization
# Недостатки: Runtime overhead для большого量 объектов
Summary
Стек, который я использовал чаще всего:
- Data Processing: PySpark (big data) + Pandas (small data)
- APIs: FastAPI + SQLAlchemy
- Orchestration: Airflow (large projects) + Luigi (small tasks)
- Validation: Pydantic + Great Expectations
- Databases: PostgreSQL + SQLAlchemy ORM
Выбор зависит от:
- Размера данных (Pandas < 5GB, PySpark > 5GB)
- Complexity (Luigi для простого, Airflow для complex)
- Performance требований (raw SQL > ORM)
- Team expertise (используй то, что знаешь)
Лучше практики:
- Используй правильный инструмент для работы
- Не переусложняй, если можно проще
- Всегда валидируй данные
- Мониторь и логируй
- Пиши тесты (pytest)