← Назад к вопросам

Чем занимался на Python?

1.0 Junior🔥 201 комментариев
#Python

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Чем занимался на Python

На Python я работал с полным спектром Data Engineering задач.

1. ETL/ELT Pipelines

Разработка pipeline обработки данных для миграции из различных источников:

import pandas as pd
from sqlalchemy import create_engine
import logging

class DataPipeline:
    def __init__(self, source_conn, target_conn):
        self.source = source_conn
        self.target = target_conn
        self.logger = logging.getLogger(__name__)
    
    def extract(self, query):
        try:
            df = pd.read_sql(query, self.source)
            self.logger.info(f"Extracted {len(df)} rows")
            return df
        except Exception as e:
            self.logger.error(f"Extraction failed: {e}")
            raise
    
    def transform(self, df):
        df['created_date'] = pd.to_datetime(df['created_date'])
        df = df.drop_duplicates(subset=['id'])
        df = df[df['amount'] > 0]
        return df
    
    def load(self, df, table_name):
        df.to_sql(table_name, self.target, if_exists='append', index=False)
        self.logger.info(f"Loaded {len(df)} rows to {table_name}")

# Использование
source_engine = create_engine('postgresql://user:pass@source:5432/db')
target_engine = create_engine('postgresql://user:pass@target:5432/db')

pipeline = DataPipeline(source_engine, target_engine)
data = pipeline.extract("SELECT * FROM orders WHERE created_date > '2024-01-01'")
cleaned = pipeline.transform(data)
pipeline.load(cleaned, 'orders_processed')

2. Data Cleaning и Validation

Очистка данных с использованием Pandas и Pydantic:

from pydantic import BaseModel, validator
import numpy as np

class Order(BaseModel):
    order_id: str
    amount: float
    status: str
    
    @validator('amount')
    def validate_amount(cls, v):
        if v < 0:
            raise ValueError('Amount cannot be negative')
        return round(v, 2)
    
    @validator('status')
    def validate_status(cls, v):
        if v not in ['pending', 'completed', 'cancelled']:
            raise ValueError('Invalid status')
        return v

# Обработка DataFrame
def clean_dataframe(df):
    for idx, row in df.iterrows():
        try:
            Order(**row.to_dict())
        except ValueError as e:
            df.drop(idx, inplace=True)
    return df

3. Spark обработка больших данных

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as spark_sum

spark = SparkSession.builder.appName("BigDataProcess").getOrCreate()

# Чтение данных
df = spark.read.parquet("s3://data/events/")

# Трансформация
processed = df \
    .filter(col("timestamp") > "2024-01-01") \
    .groupBy("user_id") \
    .agg(spark_sum("amount").alias("total_spent"))

# Сохранение
processed.write.parquet("s3://data/processed/", mode="overwrite")

4. Работа с API

Интеграция с внешними API для получения данных:

import requests
import json
from datetime import datetime, timedelta

class APIClient:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {api_key}"}
    
    def fetch_paginated(self, endpoint, page_size=100):
        all_data = []
        page = 1
        
        while True:
            params = {"page": page, "limit": page_size}
            response = requests.get(
                f"{self.base_url}/{endpoint}",
                headers=self.headers,
                params=params
            )
            
            if response.status_code != 200:
                break
            
            data = response.json()
            if not data:
                break
            
            all_data.extend(data)
            page += 1
        
        return all_data

# Использование
client = APIClient("https://api.example.com", "secret-key")
users = client.fetch_paginated("users")

5. Airflow DAGs

Оркестрация сложных workflow:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG('data_pipeline', default_args=default_args, schedule_interval='@daily') as dag:
    
    def extract_data():
        # Извлечение
        pass
    
    def transform_data():
        # Трансформация
        pass
    
    def load_data():
        # Загрузка
        pass
    
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    load = PythonOperator(task_id='load', python_callable=load_data)
    
    extract >> transform >> load

6. Анализ данных и вычисления

import pandas as pd
import numpy as np
from scipy import stats

# RFM анализ
def rfm_analysis(orders_df):
    rfm = orders_df.groupby('customer_id').agg({
        'order_date': 'max',
        'order_id': 'count',
        'amount': 'sum'
    })
    
    rfm.columns = ['LastPurchase', 'Frequency', 'MonetaryValue']
    today = pd.Timestamp.now()
    rfm['Recency'] = (today - rfm['LastPurchase']).dt.days
    
    # Segmentation
    rfm['RScore'] = pd.qcut(rfm['Recency'], 5, labels=range(5, 0, -1), duplicates='drop')
    rfm['FScore'] = pd.qcut(rfm['Frequency'], 5, labels=range(1, 6), duplicates='drop')
    rfm['MScore'] = pd.qcut(rfm['MonetaryValue'], 5, labels=range(1, 6), duplicates='drop')
    
    rfm['RFMSegment'] = rfm['RScore'].astype(str) + rfm['FScore'].astype(str) + rfm['MScore'].astype(str)
    
    return rfm

# A/B тест анализ
def ab_test_stats(control, treatment, alpha=0.05):
    t_stat, p_value = stats.ttest_ind(control, treatment)
    
    return {
        'mean_control': control.mean(),
        'mean_treatment': treatment.mean(),
        'lift': ((treatment.mean() - control.mean()) / control.mean() * 100),
        'p_value': p_value,
        'significant': p_value < alpha
    }

7. Data Quality Framework

import pandas as pd

class DataQualityChecker:
    def __init__(self, df):
        self.df = df
        self.report = {}
    
    def check_nulls(self, threshold=0.05):
        null_ratios = self.df.isnull().sum() / len(self.df)
        self.report['nulls'] = null_ratios[null_ratios > threshold].to_dict()
        return self
    
    def check_duplicates(self):
        dup_count = self.df.duplicated().sum()
        self.report['duplicates'] = dup_count
        return self
    
    def check_outliers(self, column, std_threshold=3):
        mean = self.df[column].mean()
        std = self.df[column].std()
        outliers = ((self.df[column] - mean).abs() > std_threshold * std).sum()
        self.report['outliers'] = {column: outliers}
        return self
    
    def generate_report(self):
        return self.report

# Использование
checker = DataQualityChecker(df)
report = checker.check_nulls().check_duplicates().check_outliers('amount').generate_report()

8. Работа с неструктурированными данными

import json
import xml.etree.ElementTree as ET

# JSON обработка
def process_json_lines(file_path):
    data = []
    with open(file_path) as f:
        for line in f:
            data.append(json.loads(line))
    return pd.DataFrame(data)

# CSV с различными кодировками
def read_csv_safe(file_path):
    for encoding in ['utf-8', 'latin-1', 'cp1252']:
        try:
            return pd.read_csv(file_path, encoding=encoding)
        except:
            continue
    raise ValueError("Could not read file")

# XML парсинг
def parse_xml(file_path):
    tree = ET.parse(file_path)
    root = tree.getroot()
    records = []
    for item in root.findall('.//record'):
        record = {child.tag: child.text for child in item}
        records.append(record)
    return pd.DataFrame(records)

9. Мониторинг и логирование

import logging
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

class PipelineLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
    
    def log_metrics(self, **kwargs):
        self.logger.info(f"Metrics: {json.dumps(kwargs)}")
    
    def log_error(self, error, context=None):
        self.logger.error(f"Error: {error}, Context: {context}")

Результаты

  • Обработка 100GB+ данных ежедневно
  • 99.5% uptime для critical pipeline
  • Сокращение времени разработки на 40%
  • Автоматизация 80% ручных операций