Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Чем занимался на Python
На Python я работал с полным спектром Data Engineering задач.
1. ETL/ELT Pipelines
Разработка pipeline обработки данных для миграции из различных источников:
import pandas as pd
from sqlalchemy import create_engine
import logging
class DataPipeline:
def __init__(self, source_conn, target_conn):
self.source = source_conn
self.target = target_conn
self.logger = logging.getLogger(__name__)
def extract(self, query):
try:
df = pd.read_sql(query, self.source)
self.logger.info(f"Extracted {len(df)} rows")
return df
except Exception as e:
self.logger.error(f"Extraction failed: {e}")
raise
def transform(self, df):
df['created_date'] = pd.to_datetime(df['created_date'])
df = df.drop_duplicates(subset=['id'])
df = df[df['amount'] > 0]
return df
def load(self, df, table_name):
df.to_sql(table_name, self.target, if_exists='append', index=False)
self.logger.info(f"Loaded {len(df)} rows to {table_name}")
# Использование
source_engine = create_engine('postgresql://user:pass@source:5432/db')
target_engine = create_engine('postgresql://user:pass@target:5432/db')
pipeline = DataPipeline(source_engine, target_engine)
data = pipeline.extract("SELECT * FROM orders WHERE created_date > '2024-01-01'")
cleaned = pipeline.transform(data)
pipeline.load(cleaned, 'orders_processed')
2. Data Cleaning и Validation
Очистка данных с использованием Pandas и Pydantic:
from pydantic import BaseModel, validator
import numpy as np
class Order(BaseModel):
order_id: str
amount: float
status: str
@validator('amount')
def validate_amount(cls, v):
if v < 0:
raise ValueError('Amount cannot be negative')
return round(v, 2)
@validator('status')
def validate_status(cls, v):
if v not in ['pending', 'completed', 'cancelled']:
raise ValueError('Invalid status')
return v
# Обработка DataFrame
def clean_dataframe(df):
for idx, row in df.iterrows():
try:
Order(**row.to_dict())
except ValueError as e:
df.drop(idx, inplace=True)
return df
3. Spark обработка больших данных
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as spark_sum
spark = SparkSession.builder.appName("BigDataProcess").getOrCreate()
# Чтение данных
df = spark.read.parquet("s3://data/events/")
# Трансформация
processed = df \
.filter(col("timestamp") > "2024-01-01") \
.groupBy("user_id") \
.agg(spark_sum("amount").alias("total_spent"))
# Сохранение
processed.write.parquet("s3://data/processed/", mode="overwrite")
4. Работа с API
Интеграция с внешними API для получения данных:
import requests
import json
from datetime import datetime, timedelta
class APIClient:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_key}"}
def fetch_paginated(self, endpoint, page_size=100):
all_data = []
page = 1
while True:
params = {"page": page, "limit": page_size}
response = requests.get(
f"{self.base_url}/{endpoint}",
headers=self.headers,
params=params
)
if response.status_code != 200:
break
data = response.json()
if not data:
break
all_data.extend(data)
page += 1
return all_data
# Использование
client = APIClient("https://api.example.com", "secret-key")
users = client.fetch_paginated("users")
5. Airflow DAGs
Оркестрация сложных workflow:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG('data_pipeline', default_args=default_args, schedule_interval='@daily') as dag:
def extract_data():
# Извлечение
pass
def transform_data():
# Трансформация
pass
def load_data():
# Загрузка
pass
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
load = PythonOperator(task_id='load', python_callable=load_data)
extract >> transform >> load
6. Анализ данных и вычисления
import pandas as pd
import numpy as np
from scipy import stats
# RFM анализ
def rfm_analysis(orders_df):
rfm = orders_df.groupby('customer_id').agg({
'order_date': 'max',
'order_id': 'count',
'amount': 'sum'
})
rfm.columns = ['LastPurchase', 'Frequency', 'MonetaryValue']
today = pd.Timestamp.now()
rfm['Recency'] = (today - rfm['LastPurchase']).dt.days
# Segmentation
rfm['RScore'] = pd.qcut(rfm['Recency'], 5, labels=range(5, 0, -1), duplicates='drop')
rfm['FScore'] = pd.qcut(rfm['Frequency'], 5, labels=range(1, 6), duplicates='drop')
rfm['MScore'] = pd.qcut(rfm['MonetaryValue'], 5, labels=range(1, 6), duplicates='drop')
rfm['RFMSegment'] = rfm['RScore'].astype(str) + rfm['FScore'].astype(str) + rfm['MScore'].astype(str)
return rfm
# A/B тест анализ
def ab_test_stats(control, treatment, alpha=0.05):
t_stat, p_value = stats.ttest_ind(control, treatment)
return {
'mean_control': control.mean(),
'mean_treatment': treatment.mean(),
'lift': ((treatment.mean() - control.mean()) / control.mean() * 100),
'p_value': p_value,
'significant': p_value < alpha
}
7. Data Quality Framework
import pandas as pd
class DataQualityChecker:
def __init__(self, df):
self.df = df
self.report = {}
def check_nulls(self, threshold=0.05):
null_ratios = self.df.isnull().sum() / len(self.df)
self.report['nulls'] = null_ratios[null_ratios > threshold].to_dict()
return self
def check_duplicates(self):
dup_count = self.df.duplicated().sum()
self.report['duplicates'] = dup_count
return self
def check_outliers(self, column, std_threshold=3):
mean = self.df[column].mean()
std = self.df[column].std()
outliers = ((self.df[column] - mean).abs() > std_threshold * std).sum()
self.report['outliers'] = {column: outliers}
return self
def generate_report(self):
return self.report
# Использование
checker = DataQualityChecker(df)
report = checker.check_nulls().check_duplicates().check_outliers('amount').generate_report()
8. Работа с неструктурированными данными
import json
import xml.etree.ElementTree as ET
# JSON обработка
def process_json_lines(file_path):
data = []
with open(file_path) as f:
for line in f:
data.append(json.loads(line))
return pd.DataFrame(data)
# CSV с различными кодировками
def read_csv_safe(file_path):
for encoding in ['utf-8', 'latin-1', 'cp1252']:
try:
return pd.read_csv(file_path, encoding=encoding)
except:
continue
raise ValueError("Could not read file")
# XML парсинг
def parse_xml(file_path):
tree = ET.parse(file_path)
root = tree.getroot()
records = []
for item in root.findall('.//record'):
record = {child.tag: child.text for child in item}
records.append(record)
return pd.DataFrame(records)
9. Мониторинг и логирование
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class PipelineLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
def log_metrics(self, **kwargs):
self.logger.info(f"Metrics: {json.dumps(kwargs)}")
def log_error(self, error, context=None):
self.logger.error(f"Error: {error}, Context: {context}")
Результаты
- Обработка 100GB+ данных ежедневно
- 99.5% uptime для critical pipeline
- Сокращение времени разработки на 40%
- Автоматизация 80% ручных операций