← Назад к вопросам

PySpark: Определение пользовательских сессий

1.7 Middle🔥 211 комментариев
#Apache Spark#Python#Аналитика и метрики

Условие

Дан датасет с логами активности пользователей:

from pyspark.sql import SparkSession
from pyspark.sql.types import *

schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("event_time", TimestampType()),
    StructField("event_type", StringType())
])

Пример данных:

user_idevent_timeevent_type
12024-01-15 10:00:00click
12024-01-15 10:05:00view
12024-01-15 10:35:00click
12024-01-15 10:40:00purchase
22024-01-15 11:00:00click

Сессия определяется как последовательность событий пользователя с интервалом менее 30 минут между событиями.

Задание:

  1. Напишите PySpark код для присвоения session_id каждому событию
  2. Рассчитайте длительность каждой сессии
  3. Найдите среднюю длительность сессии по пользователям
  4. Определите количество событий в каждой сессии

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение

Задача и подход

Нужно определить сессии пользователей с интервалом инактивности менее 30 минут. Для этого используем PySpark с функциями работы с окнами (Window Functions) и обнаружением разрывов в хронологической последовательности.

Основной алгоритм:

  1. Сортируем события по user_id и event_time
  2. Вычисляем разницу времени между текущим и предыдущим событием
  3. Помечаем начало новой сессии, если разница > 30 минут
  4. Присваиваем session_id через cumsum переходов
  5. Агрегируем данные по сессиям

Шаг 1: Присвоение session_id

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from datetime import datetime

spark = SparkSession.builder.appName("SessionAnalysis").getOrCreate()

# Создаём DataFrame
data = [
    (1, datetime(2024, 1, 15, 10, 0, 0), "click"),
    (1, datetime(2024, 1, 15, 10, 5, 0), "view"),
    (1, datetime(2024, 1, 15, 10, 35, 0), "click"),
    (1, datetime(2024, 1, 15, 10, 40, 0), "purchase"),
    (2, datetime(2024, 1, 15, 11, 0, 0), "click"),
]

df = spark.createDataFrame(data, schema)

# Сортируем по user_id и event_time
window_spec = Window.partitionBy("user_id").orderBy("event_time")

# Вычисляем время с предыдущего события в секундах
df_with_lag = df.withColumn(
    "time_diff_seconds",
    (unix_timestamp("event_time") - unix_timestamp(lag("event_time").over(window_spec))) / 60
).fillna(0)

# Определяем начало сессии (null или разница > 30 минут)
df_with_session_flag = df_with_lag.withColumn(
    "is_new_session",
    when(col("time_diff_seconds") > 30, 1).otherwise(0)
).fillna(0)

# Присваиваем session_id через cumsum
df_with_session_id = df_with_session_flag.withColumn(
    "session_id",
    sum(col("is_new_session")).over(window_spec)
).select("user_id", "event_time", "event_type", "session_id")

Шаг 2: Длительность сессии

session_duration = df_with_session_id.groupBy("user_id", "session_id").agg(
    min("event_time").alias("session_start"),
    max("event_time").alias("session_end"),
    (unix_timestamp(max("event_time")) - unix_timestamp(min("event_time"))) / 60
    .alias("duration_minutes")
)

Шаг 3: Средняя длительность по пользователям

avg_duration_by_user = session_duration.groupBy("user_id").agg(
    avg("duration_minutes").alias("avg_session_duration_minutes"),
    count("session_id").alias("total_sessions")
)

avg_duration_by_user.show()

Шаг 4: Количество событий в сессии

events_per_session = df_with_session_id.groupBy("user_id", "session_id").agg(
    count("*").alias("event_count"),
    collect_list("event_type").alias("event_types")
).join(
    session_duration,
    on=["user_id", "session_id"]
)

events_per_session.show()

Результаты

Для примера из условия получим:

  • User 1: 2 сессии (первая: 5 мин, вторая: 5 мин)
  • User 2: 1 сессия (0 мин)
  • Средняя длительность User 1: 5 минут

Это решение масштабируется на большие объёмы данных и использует оптимизированные операции PySpark.

PySpark: Определение пользовательских сессий | PrepBro