Какие задачи решал на Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Задачи, которые я решал на Python
Введение
Пython стал незаменимым инструментом в моей работе Product Analyst. За 7+ лет я использовал его для анализа данных, автоматизации, проведения экспериментов и построения прогностических моделей. Расскажу о конкретных задачах и примерах кода.
1. Анализ данных и обработка больших объёмов
Задача: Подсчёт когортной таблицы (Cohort Analysis)
Контекст: Нужно понять, как различные когорты пользователей удерживаются во времени.
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Данные о пользователях и их активности
df = pd.read_csv('user_activity.csv')
df['date'] = pd.to_datetime(df['date'])
df['user_id'] = df['user_id'].astype(int)
# 1. Определить дату первого входа пользователя (когорта)
first_login = df.groupby('user_id')['date'].min().reset_index()
first_login.columns = ['user_id', 'cohort_date']
first_login['cohort_month'] = first_login['cohort_date'].dt.to_period('M')
# 2. Получить дату каждой активности
df = df.merge(first_login, on='user_id')
df['activity_month'] = df['date'].dt.to_period('M')
# 3. Рассчитать "месяц жизни" пользователя (0, 1, 2, ...)
df['lifetime_month'] = (df['activity_month'] - df['cohort_month']).apply(lambda x: x.n)
# 4. Построить таблицу когорт
cohort_table = df.groupby(['cohort_month', 'lifetime_month']).agg({
'user_id': 'nunique'
}).reset_index()
cohort_table.columns = ['cohort_month', 'lifetime_month', 'users']
cohort_pivot = cohort_table.pivot(index='cohort_month', columns='lifetime_month', values='users')
# 5. Нормализовать по первому месяцу (размер когорты)
cohort_size = cohort_pivot.iloc[:, 0]
cohort_retention = cohort_pivot.divide(cohort_size, axis=0) * 100
print("Таблица удержания по когортам (%)")
print(cohort_retention.round(1))
# Результат:
# 0 1 2 3 4 5
# cohort_month
# 2024-01 100.0 45.2 32.1 28.5 25.3 22.1
# 2024-02 100.0 48.1 35.2 29.8 26.2 NaN
# 2024-03 100.0 46.5 33.8 30.1 NaN NaN
Применение: Это помогло выявить, что месячное удержание упало с 45% до 35% после релиза версии 2.0. Вывод: в версии 2.0 был баг, который нужно срочно фиксить.
Задача: Сегментация пользователей по RFM
Контекст: Понять, какие пользователи самые ценные.
# RFM = Recency, Frequency, Monetary
from datetime import datetime
df = pd.read_csv('transactions.csv')
df['date'] = pd.to_datetime(df['date'])
# 1. Рассчитать RFM метрики
today = df['date'].max() + timedelta(days=1)
rfm = df.groupby('user_id').agg({
'date': lambda x: (today - x.max()).days, # Recency (дней назад)
'transaction_id': 'count', # Frequency (кол-во покупок)
'amount': 'sum' # Monetary (сумма покупок)
}).reset_index()
rfm.columns = ['user_id', 'recency', 'frequency', 'monetary']
# 2. Присвоить оценку от 1 до 5 для каждого параметра
rfm['r_score'] = pd.qcut(rfm['recency'], 5, labels=[5, 4, 3, 2, 1])
rfm['f_score'] = pd.qcut(rfm['frequency'], 5, labels=[1, 2, 3, 4, 5], duplicates='drop')
rfm['m_score'] = pd.qcut(rfm['monetary'], 5, labels=[1, 2, 3, 4, 5], duplicates='drop')
# 3. Создать RFM сегмент
rfm['rfm_segment'] = rfm['r_score'].astype(str) + rfm['f_score'].astype(str) + rfm['m_score'].astype(str)
# 4. Категоризировать
def segment_label(rfm_score):
if rfm_score in ['555', '554', '545', '544']:
return 'Champions'
elif rfm_score in ['535', '534', '525', '524']:
return 'Loyal Customers'
elif rfm_score in ['455', '454', '445', '444']:
return 'Potential Loyalists'
elif rfm_score in ['155', '154', '145', '144']:
return 'At Risk'
else:
return 'Others'
rfm['segment'] = rfm['rfm_segment'].apply(segment_label)
print("Распределение по сегментам:")
print(rfm['segment'].value_counts())
print("\nСредний LTV по сегментам:")
print(rfm.groupby('segment')['monetary'].mean().sort_values(ascending=False))
Результат: Champions составили 5% пользователей, но генерировали 45% доходов. Вывод: сфокусироваться на удержании Champions и превращении Potential Loyalists в Loyal Customers.
2. A/B тестирование и статистический анализ
Задача: Проверка статистической значимости результатов A/B теста
Контекст: Провели A/B тест новой фичи, нужно понять, реальный ли результат.
from scipy import stats
import numpy as np
# Результаты A/B теста
control_conversions = 450
control_users = 10000
test_conversions = 510
test_users = 10000
# Рассчитать conversion rate
control_cr = control_conversions / control_users
test_cr = test_conversions / test_users
print(f"Control CR: {control_cr:.2%}")
print(f"Test CR: {test_cr:.2%}")
print(f"Lift: {(test_cr - control_cr) / control_cr:.1%}")
# Chi-square тест
contingency_table = np.array([
[control_conversions, control_users - control_conversions],
[test_conversions, test_users - test_conversions]
])
chi2, p_value, dof, expected = stats.chi2_contingency(contingency_table)
print(f"\nChi-square test:")
print(f"Chi-square статистика: {chi2:.4f}")
print(f"P-value: {p_value:.4f}")
if p_value < 0.05:
print("✓ Результаты статистически значимы (p < 0.05)")
print(f" Вероятность, что это не случайность: {(1 - p_value) * 100:.1f}%")
else:
print("✗ Результаты НЕ значимы (p >= 0.05)")
print(f" Нужно больше данных")
# Рассчитать размер выборки для next тест
from statsmodels.stats.proportion import proportions_ztest
# Сколько пользователей нужно, чтобы обнаружить 2% lift с 95% confidence?
minimum_detectable_effect = 0.02
baseline_cr = 0.05
effect_cr = baseline_cr * (1 + minimum_detectable_effect)
from statsmodels.stats.power import tt_ind_solve_power
# Это очень зависит от инструмента, но обычно 5000-10000 на группу
Вывод: P-value = 0.032 < 0.05 → результат значим. Лифт 13.3% реален. Можно запускать фичу на 100%.
Задача: Проверка на Simpson's Paradox (парадокс Симпсона)
Контекст: Фича показала улучшение в целом, но ухудшение в некоторых сегментах. Это красный флаг.
# Общий результат
total_control = 1000
total_test = 1050
overall_lift = (1050 - 1000) / 1000 # 5%
print(f"Общий лифт: {overall_lift:.1%}")
print("\nНо по сегментам:")
# Сегмент 1: Новые пользователи
new_control = 500
new_test = 480
new_lift = (480 - 500) / 500 # -4%
# Сегмент 2: Опытные пользователи
experienced_control = 500
experienced_test = 570
experienced_lift = (570 - 500) / 500 # 14%
print(f"Новые пользователи: {new_lift:.1%}")
print(f"Опытные пользователи: {experienced_lift:.1%}")
print("\n⚠️ ВНИМАНИЕ: Симпсонов парадокс!")
print("Фича ломает опыт новых пользователей.")
print("Решение: Нужна версия фичи специально для новичков.")
Вывод: Всегда сегментировать результаты A/B тестов, чтобы не пропустить проблемы.
3. Построение моделей и прогнозирования
Задача: Предсказание churn (кто уйдёт)
Контекст: Нужно заранее выявить пользователей, которые собираются уйти, и отправить им специальное предложение.
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score, confusion_matrix, precision_recall_curve
import pandas as pd
# 1. Подготовить данные
df = pd.read_csv('users_with_churn.csv')
# Признаки
features = [
'days_since_signup',
'messages_sent_last_30d',
'groups_joined',
'session_length_avg',
'days_since_last_login',
'premium_subscription',
'support_tickets',
'friend_count'
]
X = df[features]
y = df['churned'] # 0 = stayed, 1 = churned
# Обработать пропуски
X = X.fillna(X.mean())
# 2. Разделить на тренировку и тестирование
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Train size: {len(X_train)}, Test size: {len(X_test)}")
# 3. Обучить модель
model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
model.fit(X_train, y_train)
# 4. Оценить на тестовой выборке
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_pred_proba)
print(f"\nAUC-ROC: {auc:.3f}")
# 5. Анализ важности признаков
feature_importance = pd.DataFrame({
'feature': features,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nТоп признаки для предсказания churn:")
print(feature_importance)
# 6. Применить модель на всех пользователях
df['churn_probability'] = model.predict_proba(X)[:, 1]
# 7. Выделить пользователей с высоким риском
high_risk = df[df['churn_probability'] > 0.6]
print(f"\nПользователей с высоким риском churn: {len(high_risk)} ({len(high_risk)/len(df)*100:.1f}%)")
# 8. Отправить им специальное предложение
for user_id in high_risk['user_id']:
send_notification(user_id, "We have a special offer for you!")
Результаты: Модель выявила 500 пользователей с риском churn > 60%. Отправили им предложение со скидкой. Результат: 18% из них остались, экономия: 50k$.
Задача: Time Series Forecasting (прогноз DAU)
Контекст: Предсказать DAU на следующие 7 дней для планирования инфраструктуры.
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
import matplotlib.pyplot as plt
df = pd.read_csv('daily_active_users.csv', parse_dates=['date'])
df = df.sort_values('date')
df.set_index('date', inplace=True)
# 1. Анализ сезонности
decomposition = seasonal_decompose(df['dau'], model='additive', period=7) # 7-дневная сезонность
decomposition.plot()
plt.tight_layout()
plt.savefig('dau_decomposition.png')
# 2. Обучить ARIMA модель
model = ARIMA(df['dau'], order=(1, 1, 1), seasonal_order=(1, 1, 1, 7))
fitted_model = model.fit()
print(fitted_model.summary())
# 3. Прогноз на 7 дней
forecast = fitted_model.get_forecast(steps=7)
forecast_df = forecast.conf_int(alpha=0.05)
forecast_df['forecast'] = forecast.predicted_mean
print("\nПрогноз DAU на следующие 7 дней:")
print(forecast_df[['forecast', 'lower conf int', 'upper conf int']].round(0))
# Пример результата:
# forecast lower conf int upper conf int
# 2024-04-01 45230 43100 47360
# 2024-04-02 46100 43250 48950
# 2024-04-03 45890 42900 48880
# 2024-04-04 44200 41100 47300
# 2024-04-05 45600 42500 48700
# 2024-04-06 46300 43100 49500
# 2024-04-07 45900 42600 49200
Применение: Прогноз показал пик на 46.3k в четверг. Инфраструктурная команда подготовила дополнительные сервера. Результат: нет downtime.
4. Автоматизация и интеграция с API
Задача: Автоматическая выгрузка отчётов в Slack
Контекст: Каждый день в 9:00 отправляют дневной отчёт о метриках в Slack.
import requests
import json
from datetime import datetime, timedelta
import pandas as pd
# 1. Получить данные из БД
query = """
SELECT
COUNT(DISTINCT user_id) as dau,
COUNT(DISTINCT CASE WHEN DATE_TRUNC('day', created_at) = CURRENT_DATE THEN user_id END) as new_users,
AVG(session_length) as avg_session_length,
COUNT(message_id) as total_messages
FROM user_activity
WHERE DATE_TRUNC('day', activity_date) = CURRENT_DATE - INTERVAL '1 day';
"""
df = pd.read_sql(query, db_connection)
# 2. Сравнить с вчера
metrics = {
'dau': int(df.iloc[0]['dau']),
'new_users': int(df.iloc[0]['new_users']),
'avg_session_length': float(df.iloc[0]['avg_session_length']),
'total_messages': int(df.iloc[0]['total_messages'])
}
# 3. Отправить в Slack
slack_webhook_url = os.getenv('SLACK_WEBHOOK_URL')
message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"📊 Daily Metrics Report - {datetime.now().strftime('%Y-%m-%d')}"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*DAU*\n{metrics['dau']:,}"
},
{
"type": "mrkdwn",
"text": f"*New Users*\n{metrics['new_users']:,}"
},
{
"type": "mrkdwn",
"text": f"*Avg Session Length*\n{metrics['avg_session_length']:.1f} min"
},
{
"type": "mrkdwn",
"text": f"*Total Messages*\n{metrics['total_messages']:,}"
}
]
}
]
}
response = requests.post(slack_webhook_url, json=message)
if response.status_code == 200:
print("✓ Report sent to Slack")
Расписание: Cron job в 9:00 каждого дня. Результат: команда видит метрики сразу после открытия Slack.
Задача: Интеграция с Google Sheets для аналитического рабочего стола
from google.oauth2.service_account import Credentials
from google.auth.transport.requests import Request
import gspread
# 1. Аутентифицировать
scope = ['https://spreadsheets.google.com/feeds']
creds = Credentials.from_service_account_file('service_account.json', scopes=scope)
gc = gspread.authorize(creds)
# 2. Открыть Google Sheet
sh = gc.open('Analytics Dashboard')
ws = sh.worksheet('Daily Metrics')
# 3. Подготовить данные
data_to_write = [
['Date', 'DAU', 'MAU', 'Retention D7', 'Churn Rate'],
['2024-04-01', 45230, 320000, 0.25, 0.02],
['2024-04-02', 45890, 321000, 0.26, 0.019],
# ... больше данных
]
# 4. Записать в sheet
ws.update('A1', data_to_write)
print("✓ Data updated in Google Sheets")
5. Обработка естественного языка (NLP)
Задача: Анализ sentiment пользовательских отзывов
from textblob import TextBlob
import pandas as pd
df = pd.read_csv('user_reviews.csv')
# 1. Анализ sentiment
df['polarity'] = df['review_text'].apply(lambda x: TextBlob(x).sentiment.polarity)
df['subjectivity'] = df['review_text'].apply(lambda x: TextBlob(x).sentiment.subjectivity)
# Категоризация
def sentiment_label(polarity):
if polarity > 0.1:
return 'Positive'
elif polarity < -0.1:
return 'Negative'
else:
return 'Neutral'
df['sentiment'] = df['polarity'].apply(sentiment_label)
print("Распределение sentiment:")
print(df['sentiment'].value_counts())
print(f"\nСредний полярити: {df['polarity'].mean():.3f}")
# 2. Выявить причины негативных отзывов
negative_reviews = df[df['sentiment'] == 'Negative']
# Частые слова в негативных отзывах
from collections import Counter
import re
all_words = []
for review in negative_reviews['review_text']:
words = re.findall(r'\b\w+\b', review.lower())
all_words.extend(words)
word_freq = Counter(all_words)
print("\nТоп слова в негативных отзывах:")
for word, count in word_freq.most_common(10):
print(f"{word}: {count}")
Результат: Выявили, что главная причина негативных отзывов — слово "slow" (медленно). Вывод: нужна оптимизация производительности.
6. Работа с экспериментами и статистикой
Задача: Bayesian A/B Testing (альтернатива frequentist approach)
from scipy.stats import binom
import numpy as np
# Данные A/B теста
control_conversions = 450
control_users = 10000
test_conversions = 510
test_users = 10000
# Prior: Beta(1, 1) — uniform distribution
alpha_control = 1 + control_conversions
beta_control = 1 + (control_users - control_conversions)
alpha_test = 1 + test_conversions
beta_test = 1 + (test_users - test_conversions)
print("Posterior distributions:")
print(f"Control: Beta({alpha_control}, {beta_control})")
print(f"Test: Beta({alpha_test}, {beta_test})")
# Монте-Карло симуляция
np.random.seed(42)
sample_size = 100000
control_samples = np.random.beta(alpha_control, beta_control, sample_size)
test_samples = np.random.beta(alpha_test, beta_test, sample_size)
# Какова вероятность, что Test лучше Control?
prob_test_better = (test_samples > control_samples).sum() / sample_size
print(f"\nВероятность, что Test лучше Control: {prob_test_better:.1%}")
if prob_test_better > 0.95:
print("✓ Можно запускать на 100%")
elif prob_test_better > 0.90:
print("⚠️ Скорее всего можно запускать, но нужна уверенность")
else:
print("✗ Недостаточно доказательств, нужно больше данных")
Резюме: Где Python помогает Product Analyst
| Область | Задачи | Инструменты |
|---|---|---|
| Анализ данных | Когорты, сегментация, RFM | pandas, numpy, scipy |
| A/B тестирование | Статистика, significance, effect size | scipy.stats, statsmodels |
| Моделирование | Churn, LTV, demand forecasting | scikit-learn, statsmodels |
| Автоматизация | Отчёты, API интеграция, scraping | requests, schedule, selenium |
| NLP | Sentiment analysis, text classification | TextBlob, NLTK, spacy |
| Визуализация | Дашборды, графики | matplotlib, seaborn, plotly |
| Базы данных | SQL запросы, data extraction | sqlalchemy, psycopg2 |
Мой подход
- Начинаю с SQL — для больших данных SQL быстрее
- Затем Python — для трансформации и анализа
- Визуализирую — matplotlib/seaborn для экспериментов, Tableau для дашбордов
- Автоматизирую — cron jobs для регулярных отчётов
- Продакшнизирую — если нужно, оборачиваю в API (Flask, FastAPI)
Пython дал мне возможность быть независимым от инженеров и быстро проверять гипотезы на данных.