← Назад к вопросам

Как задать временные окна в потоковой обработке?

1.8 Middle🔥 181 комментариев
#Apache Kafka и потоковая обработка

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Временные окна в потоковой обработке

Временные окна (Time Windows) — это логическое разделение непрерывного потока данных на отдельные интервалы времени для обработки и агрегирования. Это ключевой механизм для аналитики потоков в режиме реального времени.

Основные типы временных окон

1. Tumbling Windows (Переворачивающиеся окна)

Фиксированные непересекающиеся окна. Каждый элемент принадлежит ровно одному окну.

Характеристики:

  • Размер окна фиксирован (например, 5 минут)
  • Окна не перекрываются
  • После окончания окна сразу начинается новое
  • Простая и понятная семантика
# Apache Spark Structured Streaming
from pyspark.sql.functions import window

# Окна по 5 минут
windowed_df = df.groupBy(
    window(col("timestamp"), "5 minutes")
).agg({
    "amount": "sum",
    "user_id": "count"
})

# Результат: для каждого 5-минутного окна суммы и количество
-- Flink SQL
SELECT
    TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
    user_id,
    COUNT(*) as events,
    SUM(amount) as total
FROM events
GROUP BY
    TUMBLE(event_time, INTERVAL '5' MINUTE),
    user_id;

Использование:

  • Почасовые/ежедневные отчёты
  • Агрегирование метрик по фиксированным интервалам
  • Подсчёт событий за период

2. Sliding Windows (Скользящие окна)

Пересекающиеся окна, которые движутся с заданным шагом (slide).

Характеристики:

  • Размер окна: 10 минут
  • Шаг скольжения (slide): 5 минут
  • Окна перекрываются
  • Элемент может принадлежать нескольким окнам
# Spark: окно 10 минут со сдвигом 5 минут
windowed_df = df.groupBy(
    window(col("timestamp"), "10 minutes", "5 minutes")
).agg({
    "amount": "sum"
})

# Окно [12:00-12:10], [12:05-12:15], [12:10-12:20] ...
-- Flink: скользящее окно
SELECT
    HOP_END(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
    COUNT(*) as count
FROM events
GROUP BY
    HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE);

Использование:

  • Скользящее среднее (moving average)
  • Обнаружение аномалий
  • Real-time мониторинг тренда
  • "Последние N минут" метрики

3. Session Windows (Сеансовые окна)

Динамические окна, основанные на активности. Окно закрывается при отсутствии активности в течение timeout.

Характеристики:

  • Размер окна динамический
  • Определяется периодом неактивности (gap)
  • Окна не пересекаются
  • Полезны для анализа поведения пользователей
# Spark: сеансовое окно с gap 10 минут
from pyspark.sql.functions import col, session_window

session_df = df.groupBy(
    col("user_id"),
    session_window(col("timestamp"), "10 minutes")
).agg({
    "amount": "sum",
    "event_id": "count"
})

# Если пользователь неактивен 10 минут — сеанс закрывается
# Kafka Streams
from confluent_kafka.admin import AdminClient

# SessionStore хранит сеансы с таймаутом
kstream.groupByKey().windowedBy(
    SessionWindows.with_(timedelta(minutes=10))
).aggregate(
    lambda: {"count": 0, "total": 0},
    lambda k, v, agg: {"count": agg["count"] + 1, "total": agg["total"] + v["amount"]}
)

Использование:

  • Анализ сеансов пользователя
  • Группировка событий по сеансам
  • Обнаружение периодов активности

4. Custom Windows

Пользовательские временные окна с кастомной логикой.

# Пример: окна переменного размера
from pyspark.sql.functions import col, when

# Часовые окна в рабочее время, 4-часовые в нерабочее
df_with_window = df.withColumn(
    "window_size",
    when((hour(col("timestamp")) >= 9) & (hour(col("timestamp")) < 18),
         "1 hour").otherwise("4 hours")
)

Обработка поздних данных (Late Data)

Важный аспект потоковой обработки — что делать с данными, которые пришли позже закрытия окна.

# Spark Structured Streaming
query = df.groupBy(
    window(col("timestamp"), "5 minutes")
).agg(col("amount").sum()) \
.writeStream \
.option("watermark.delay", "10 minutes")  # Ждём 10 минут поздних данных
.option("outputMode", "append")  # Только новые окна
.start()

# outputMode варианты:
# - append: финальные результаты (когда окно закрыто)
# - update: обновления при поздних данных
# - complete: все результаты заново

Watermark — граница обработанных данных

Watermark отслеживает максимальный timestamp обработанных данных и определяет, когда окно считается завершённым.

from pyspark.sql.functions import col, window

df_with_watermark = df \
    .withWatermark("timestamp", "10 minutes")  # Ждём 10 минут поздних данных

result = df_with_watermark.groupBy(
    window(col("timestamp"), "5 minutes")
).agg(col("amount").sum())

# Окно закрывается после timestamp + 10 минут

Сравнение подходов

ТипПерекрытиеРазмерСлучай использования
TumblingНетФиксированПочасовые отчёты
SlidingДаФиксированСкользящее среднее
SessionНетДинамическийАнализ сеансов
CustomЗависитЗависитСпецифичные требования

Практический пример: Real-time метрики

# Подсчёт RPS (запросов в секунду) со скользящим окном
requests_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "requests") \
    .load()

requests_df = requests_df \
    .select(
        from_json(col("value").cast("string"), schema).alias("data"),
        col("timestamp")
    ) \
    .select(col("data.endpoint"), col("timestamp"))

rps_metrics = requests_df \
    .withWatermark("timestamp", "1 minute") \
    .groupBy(
        col("endpoint"),
        window(col("timestamp"), "1 minute", "10 seconds")
    ) \
    .agg(count("*").alias("request_count")) \
    .select(
        col("endpoint"),
        col("window.start"),
        col("window.end"),
        (col("request_count") / 60).alias("rps")
    )

query = rps_metrics.writeStream \
    .format("console") \
    .outputMode("update") \
    .start()

Выбор правильного типа временного окна критичен для корректной аналитики потока данных в режиме реального времени.

Как задать временные окна в потоковой обработке? | PrepBro