← Назад к вопросам

Какие есть виды окон агрегации?

2.0 Middle🔥 171 комментариев
#Apache Kafka и потоковая обработка

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Виды окон агрегации

Окна агрегации — это методы разбиения потока данных или массивов на логические подмножества для применения агрегирующих функций (SUM, COUNT, AVG и т.д.). Это отличается от временных окон — здесь речь о логическом группировании данных.

1. Фиксированные окна (Fixed-size Windows)

Окна фиксированного размера с перекрытием или без.

Tumbling Windows (неперекрывающиеся):

# Разбить массив на окна размером 5
import numpy as np

data = np.arange(20)
window_size = 5

# Метод 1: numpy reshape
windows = data.reshape(-1, window_size)
print(windows)
# [[ 0  1  2  3  4]
#  [ 5  6  7  8  9]
#  [10 11 12 13 14]
#  [15 16 17 18 19]]

# Агрегирование
sums = np.sum(windows, axis=1)
# [10, 35, 60, 85]

Sliding Windows (перекрывающиеся):

# Скользящее окно размером 3 со сдвигом 1
data = [1, 2, 3, 4, 5, 6, 7]
window_size = 3
stride = 1

windows = [data[i:i+window_size] for i in range(0, len(data)-window_size+1, stride)]
print(windows)
# [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]

# Агрегирование (например, скользящее среднее)
sums = [sum(window) for window in windows]
# [6, 9, 12, 15, 18]

2. Функции агрегации по окнам

SQL Window Functions

-- ROW_NUMBER(): номер строки в окне
SELECT 
    user_id,
    amount,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) as row_num
FROM orders;
-- Результат: каждому заказу номер в пределах пользователя

-- RANK(): ранг с пропусками
SELECT
    user_id,
    salary,
    RANK() OVER (ORDER BY salary DESC) as salary_rank
FROM employees;
-- Если зарплаты равны — ранги пропускаются (1, 1, 3, 4)

-- DENSE_RANK(): ранг без пропусков
SELECT
    user_id,
    salary,
    DENSE_RANK() OVER (ORDER BY salary DESC) as dense_rank
FROM employees;
-- (1, 1, 2, 3) — ранги идут подряд

-- NTILE(n): разбить на n групп
SELECT
    user_id,
    salary,
    NTILE(4) OVER (ORDER BY salary) as quartile
FROM employees;
-- Разбить на 4 квартиля

Агрегирующие функции в окне

-- SUM, AVG, COUNT с OVER
SELECT
    order_date,
    amount,
    SUM(amount) OVER (
        PARTITION BY user_id 
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_sum,
    AVG(amount) OVER (
        PARTITION BY user_id
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) as moving_avg_3
FROM orders
ORDER BY user_id, order_date;

-- ROWS BETWEEN определяет границы окна:
-- UNBOUNDED PRECEDING: от начала
-- N PRECEDING: N строк до текущей
-- CURRENT ROW: текущая строка
-- N FOLLOWING: N строк после
-- UNBOUNDED FOLLOWING: до конца

3. Типы окон по размеру

Скользящее окно фиксированного размера

# Moving Average (скользящее среднее)
def moving_average(data, window_size):
    """Скользящее среднее с окном N"""
    result = []
    for i in range(len(data)):
        start = max(0, i - window_size + 1)
        window = data[start:i+1]
        result.append(sum(window) / len(window))
    return result

prices = [100, 102, 101, 103, 105, 104, 106]
ma_3 = moving_average(prices, 3)
print(ma_3)
# [100.0, 101.0, 101.0, 102.0, 103.0, 104.0, 105.0]

Экспоненциально взвешенное окно

# Exponential Weighted Moving Average (EWMA)
import pandas as pd

data = pd.Series([100, 102, 101, 103, 105, 104, 106])
alpha = 0.3  # Коэффициент сглаживания

ewma = data.ewm(alpha=alpha).mean()
print(ewma.values)
# Недавние значения весят больше

4. Окна в потоковой обработке

Spark Streaming

# Spark Structured Streaming с разными окнами
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, count

spark = SparkSession.builder.appName("Windows").getOrCreate()

# Читаем поток
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

# Tumbling window: 5 минут
tumbling = df.groupBy(
    window(col("timestamp"), "5 minutes")
).count()

# Sliding window: 10 минут с 5-минутным сдвигом
sliding = df.groupBy(
    window(col("timestamp"), "10 minutes", "5 minutes")
).count()

# Session window: 10 минут неактивности
session = df.groupBy(
    col("user_id"),
    session_window(col("timestamp"), "10 minutes")
).count()

Apache Flink Window Functions

// Java Flink
DataStream<Event> events = // ...

// Tumbling window
events
    .keyBy(event -> event.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new SumAggregate())
    .addSink(new RollingSink<>("/path/to/output"));

// Sliding window
events
    .keyBy(event -> event.userId)
    .window(SlidingEventTimeWindows.of(
        Time.minutes(10),  // window size
        Time.minutes(5)    // slide
    ))
    .reduce((e1, e2) -> e1.amount + e2.amount)
    .print();

5. Окна по значениям (Value-based Windows)

Сегментирование по значениям вместо размера:

# Окна по достижению определённого значения
def value_based_windows(data, threshold):
    """Разбить на окна по сумме значений"""
    windows = []
    current_window = []
    current_sum = 0
    
    for value in data:
        if current_sum + value > threshold:
            if current_window:
                windows.append(current_window)
            current_window = [value]
            current_sum = value
        else:
            current_window.append(value)
            current_sum += value
    
    if current_window:
        windows.append(current_window)
    
    return windows

data = [10, 20, 15, 25, 30, 5]
windows = value_based_windows(data, 50)
print(windows)
# [[10, 20, 15], [25, 30], [5]]

6. Сравнение типов окон

ТипПерекрытиеГраницыСлучай использования
TumblingНетФиксированЧасовые отчёты
SlidingДаФиксированСкользящее среднее
SessionНетДинамическийАнализ сеансов
Value-basedПеременноПо значениюБатчи по размеру
EWMAПостоянноЭкспоненциальныйВременные ряды

7. Практический пример: обработка logов

# Подсчёт ошибок по часам + скользящее среднее
import pandas as pd
from datetime import datetime, timedelta

logs = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=240, freq='15min'),
    'error_count': [5, 3, 7, 2, 8, 4, 6, 9, 3, 5] * 24  # 10 дней
})

# Hourly aggregation (tumbling)
hourly = logs.set_index('timestamp').resample('1H').sum()
print("Hourly errors:")
print(hourly.head())

# 4-hour moving average (sliding)
moving_avg = logs.set_index('timestamp')['error_count'].rolling('4H').mean()
print("\n4-hour moving average:")
print(moving_avg.head(10))

# Exponential weighted
ewma = logs['error_count'].ewm(span=12).mean()  # 3 часа
print("\nEWMA (3-hour span):")
print(ewma.head())

8. Оптимизация окон для производительности

# Использование numpy для больших данных
import numpy as np
from scipy.ndimage import uniform_filter1d

data = np.random.rand(1000000)
window_size = 100

# Быстрое скользящее среднее
moving_avg = uniform_filter1d(data, size=window_size, mode='nearest')

# Vs. медленное
import time
start = time.time()
for i in range(len(data) - window_size):
    _ = np.mean(data[i:i+window_size])
print(f"Slow: {time.time() - start:.3f}s")

start = time.time()
_ = uniform_filter1d(data, size=window_size)
print(f"Fast: {time.time() - start:.3f}s")

Выбор правильного типа окна зависит от семантики задачи и требований к производительности.