← Назад к вопросам

Какие задачи решал с использованием Python?

1.0 Junior🔥 291 комментариев
#Pandas и обработка данных#Python и программирование#Опыт работы и проекты

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI21 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Задачи, решённые с использованием Python

1. ETL-процессы и автоматизация

Задача: Автоматизация загрузки данных из API в DWH

В e-commerce компании были данные о заказах, которые приходили из десяти различных источников (Shopify, WooCommerce, интеграции с маркетплейсами). Каждый источник имел разный формат данных.

Решение:

import requests
import pandas as pd
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class OrdersETL:
    def __init__(self, db_connection):
        self.db = db_connection
    
    def fetch_shopify_orders(self, api_key, shop_url, days_back=7):
        """Fetch orders from Shopify API"""
        headers = {'X-Shopify-Access-Token': api_key}
        url = f"{shop_url}/admin/api/2023-01/orders.json"
        
        orders = []
        since_date = (datetime.utcnow() - timedelta(days=days_back)).isoformat()
        
        params = {
            'status': 'any',
            'updated_at_min': since_date,
            'limit': 250
        }
        
        while url:
            try:
                response = requests.get(url, headers=headers, params=params)
                response.raise_for_status()
                
                data = response.json()
                orders.extend(data['orders'])
                
                # Handle pagination
                if 'Link' in response.headers:
                    links = response.headers['Link'].split(',')
                    url = None
                    for link in links:
                        if 'rel="next"' in link:
                            url = link.split(';')[0].strip('<>')
                else:
                    url = None
                    
            except requests.RequestException as e:
                logger.error(f"Error fetching Shopify orders: {e}")
                raise
        
        return pd.DataFrame([
            {
                'order_id': order['id'],
                'created_at': order['created_at'],
                'total_price': float(order['total_price']),
                'currency': order['currency'],
                'source': 'shopify'
            }
            for order in orders
        ])
    
    def normalize_and_load(self, dataframes_by_source):
        """Combine data from different sources and load to warehouse"""
        normalized_data = []
        
        for source, df in dataframes_by_source.items():
            df['source'] = source
            df['loaded_at'] = datetime.utcnow()
            normalized_data.append(df)
        
        combined_df = pd.concat(normalized_data, ignore_index=True)
        
        # Deduplication
        combined_df = combined_df.drop_duplicates(subset=['order_id', 'source'])
        
        # Load to database
        try:
            combined_df.to_sql('orders', self.db, if_exists='append', index=False)
            logger.info(f"Loaded {len(combined_df)} orders to warehouse")
        except Exception as e:
            logger.error(f"Error loading to database: {e}")
            raise

# Usage
etl = OrdersETL(db_connection)
shopify_orders = etl.fetch_shopify_orders(api_key, shop_url)
woocommerce_orders = etl.fetch_woocommerce_orders(api_key, site_url)
etl.normalize_and_load({
    'shopify': shopify_orders,
    'woocommerce': woocommerce_orders
})

Результат: Сократили время загрузки данных с 3 часов (ручной процесс) до 15 минут (автоматизированный). Исключили ошибки дублирования и формата.

2. Анализ данных и расчёт метрик

Задача: Когортный анализ retention пользователей

Необходимо было понять, как качество аудитории меняется со временем — какой процент пользователей, зарегистрировавшихся в январе, остаются активными в марте, апреле и т.д.

import pandas as pd
import numpy as np

def cohort_analysis(user_sessions):
    """
    Calculate retention cohorts
    user_sessions: DataFrame with columns [user_id, date, activity_type]
    """
    # Get first activity date for each user
    first_activity = user_sessions.groupby('user_id')['date'].min().reset_index()
    first_activity.columns = ['user_id', 'cohort_date']
    first_activity['cohort'] = first_activity['cohort_date'].dt.to_period('M')
    
    # Join with all activities
    user_sessions = user_sessions.merge(first_activity[['user_id', 'cohort']], on='user_id')
    user_sessions['activity_month'] = user_sessions['date'].dt.to_period('M')
    
    # Calculate months since first activity
    user_sessions['months_since_cohort'] = (
        user_sessions['activity_month'] - user_sessions['cohort']
    ).apply(lambda x: x.n)
    
    # Build cohort table
    cohort_data = user_sessions.groupby(['cohort', 'months_since_cohort'])['user_id'].nunique().reset_index()
    cohort_pivot = cohort_data.pivot_table(
        index='cohort',
        columns='months_since_cohort',
        values='user_id'
    )
    
    # Calculate retention rates (% of original cohort)
    cohort_size = cohort_pivot.iloc[:, 0]
    retention_cohort = cohort_pivot.divide(cohort_size, axis=0) * 100
    
    return retention_cohort

# Usage
retention_table = cohort_analysis(user_activity_df)
print(retention_table.round(1))
# Output:
#              0      1      2      3      4      5
# cohort
# 2023-01     100.0  62.5  48.3  42.1  38.9  35.7
# 2023-02     100.0  58.2  45.6  39.8  36.2   NaN
# 2023-03     100.0  61.4  49.7  44.3   NaN   NaN

Результат: Выявили, что 3-месячный retention составляет ~40-50%. Это позволило бизнесу сфокусироваться на улучшении онбординга и ценностного предложения в первый месяц использования.

3. Статистический анализ и A/B тестирование

Задача: Анализ результатов A/B теста нового алгоритма рекомендаций

from scipy import stats
from scipy.stats import ttest_ind, chi2_contingency

def analyze_ab_test(control_group, treatment_group, metric='conversion'):
    """
    Perform statistical analysis of A/B test
    """
    # Descriptive statistics
    control_mean = control_group[metric].mean()
    treatment_mean = treatment_group[metric].mean()
    
    control_std = control_group[metric].std()
    treatment_std = treatment_group[metric].std()
    
    print(f"Control: mean={control_mean:.4f}, std={control_std:.4f}, n={len(control_group)}")
    print(f"Treatment: mean={treatment_mean:.4f}, std={treatment_std:.4f}, n={len(treatment_group)}")
    print(f"Difference: {(treatment_mean - control_mean) / control_mean * 100:.1f}%\n")
    
    # Two-sample t-test
    t_stat, p_value = ttest_ind(control_group[metric], treatment_group[metric])
    
    print(f"T-test:")
    print(f"  t-statistic: {t_stat:.4f}")
    print(f"  p-value: {p_value:.4f}")
    
    alpha = 0.05
    if p_value < alpha:
        print(f"  ✓ Результат ЗНАЧИМ (p < {alpha})")
    else:
        print(f"  ✗ Результат НЕ значим (p >= {alpha})")
    
    # Calculate confidence interval for the difference
    pooled_std = np.sqrt(
        ((len(control_group) - 1) * control_std**2 + 
         (len(treatment_group) - 1) * treatment_std**2) / 
        (len(control_group) + len(treatment_group) - 2)
    )
    
    se_diff = pooled_std * np.sqrt(1/len(control_group) + 1/len(treatment_group))
    mean_diff = treatment_mean - control_mean
    
    t_critical = stats.t.ppf(0.975, len(control_group) + len(treatment_group) - 2)
    ci_lower = mean_diff - t_critical * se_diff
    ci_upper = mean_diff + t_critical * se_diff
    
    print(f"\n95% CI для разницы: [{ci_lower:.4f}, {ci_upper:.4f}]")
    
    return {
        'control_mean': control_mean,
        'treatment_mean': treatment_mean,
        'p_value': p_value,
        'is_significant': p_value < alpha,
        'ci_lower': ci_lower,
        'ci_upper': ci_upper
    }

# Usage
results = analyze_ab_test(control_df, treatment_df, metric='revenue_per_user')

4. Прогностическое моделирование

Задача: Прогноз churn пользователей

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_recall_curve

def predict_churn(user_features_df):
    """
    Predict which users will churn in next 30 days
    """
    # Feature engineering
    features = [
        'days_since_signup',
        'sessions_last_7_days',
        'avg_session_duration',
        'features_used',
        'support_tickets',
        'payment_failures'
    ]
    
    X = user_features_df[features]
    y = user_features_df['churned_in_30_days']
    
    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42, stratify=y
    )
    
    # Train model
    model = RandomForestClassifier(
        n_estimators=100,
        max_depth=10,
        min_samples_split=50,
        random_state=42,
        class_weight='balanced'  # Handle imbalanced data
    )
    model.fit(X_train, y_train)
    
    # Evaluate
    train_auc = roc_auc_score(y_train, model.predict_proba(X_train)[:, 1])
    test_auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
    
    print(f"Train AUC: {train_auc:.3f}")
    print(f"Test AUC: {test_auc:.3f}")
    
    # Feature importance
    importance_df = pd.DataFrame({
        'feature': features,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    print("\nTop features:")
    print(importance_df.head(5))
    
    # Predict churn probability for all users
    churn_prob = model.predict_proba(X)[:, 1]
    user_features_df['churn_probability'] = churn_prob
    
    # Identify high-risk users (top 20%)
    high_risk_threshold = churn_prob.quantile(0.8)
    high_risk_users = user_features_df[user_features_df['churn_probability'] >= high_risk_threshold]
    
    return model, high_risk_users

# Usage
model, at_risk = predict_churn(user_features)
print(f"\n{len(at_risk)} users at risk of churn")

Результат: Модель помогла выявить 2000 пользователей с высоким риском churn. Проактивная коммуникация с ними снизила фактический churn на 18%.

5. Работа с большими объёмами данных

Задача: Анализ логов в формате JSON размером 50GB

import pandas as pd
from dask import dataframe as dd

# Использование Dask для обработки больших данных
logs_df = dd.read_json('s3://bucket/logs/*.json', lines=True)

# Фильтрация и агрегирование
error_logs = logs_df[logs_df['level'] == 'ERROR']
errors_by_endpoint = error_logs.groupby('endpoint')['request_id'].count()

# Вычисление результатов
result = errors_by_endpoint.compute().sort_values(ascending=False)
print(result.head(10))

Итоговый результат

Все эти задачи были автоматизированы и интегрированы в Airflow, позволяя аналитической команде работать с актуальными данными в реальном времени. Python позволил мне:

  • Удалить ручные процессы
  • Обеспечить воспроизводимость анализа
  • Масштабировать обработку данных
  • Применить продвинутые статистические методы

Это дало бизнесу возможность принимать решения на основе актуальных данных, а не старых отчётов.