← Назад к вопросам
Какие есть виды окон агрегации?
2.0 Middle🔥 171 комментариев
#Apache Kafka и потоковая обработка
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Виды окон агрегации
Окна агрегации — это методы разбиения потока данных или массивов на логические подмножества для применения агрегирующих функций (SUM, COUNT, AVG и т.д.). Это отличается от временных окон — здесь речь о логическом группировании данных.
1. Фиксированные окна (Fixed-size Windows)
Окна фиксированного размера с перекрытием или без.
Tumbling Windows (неперекрывающиеся):
# Разбить массив на окна размером 5
import numpy as np
data = np.arange(20)
window_size = 5
# Метод 1: numpy reshape
windows = data.reshape(-1, window_size)
print(windows)
# [[ 0 1 2 3 4]
# [ 5 6 7 8 9]
# [10 11 12 13 14]
# [15 16 17 18 19]]
# Агрегирование
sums = np.sum(windows, axis=1)
# [10, 35, 60, 85]
Sliding Windows (перекрывающиеся):
# Скользящее окно размером 3 со сдвигом 1
data = [1, 2, 3, 4, 5, 6, 7]
window_size = 3
stride = 1
windows = [data[i:i+window_size] for i in range(0, len(data)-window_size+1, stride)]
print(windows)
# [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]
# Агрегирование (например, скользящее среднее)
sums = [sum(window) for window in windows]
# [6, 9, 12, 15, 18]
2. Функции агрегации по окнам
SQL Window Functions
-- ROW_NUMBER(): номер строки в окне
SELECT
user_id,
amount,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) as row_num
FROM orders;
-- Результат: каждому заказу номер в пределах пользователя
-- RANK(): ранг с пропусками
SELECT
user_id,
salary,
RANK() OVER (ORDER BY salary DESC) as salary_rank
FROM employees;
-- Если зарплаты равны — ранги пропускаются (1, 1, 3, 4)
-- DENSE_RANK(): ранг без пропусков
SELECT
user_id,
salary,
DENSE_RANK() OVER (ORDER BY salary DESC) as dense_rank
FROM employees;
-- (1, 1, 2, 3) — ранги идут подряд
-- NTILE(n): разбить на n групп
SELECT
user_id,
salary,
NTILE(4) OVER (ORDER BY salary) as quartile
FROM employees;
-- Разбить на 4 квартиля
Агрегирующие функции в окне
-- SUM, AVG, COUNT с OVER
SELECT
order_date,
amount,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_sum,
AVG(amount) OVER (
PARTITION BY user_id
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_3
FROM orders
ORDER BY user_id, order_date;
-- ROWS BETWEEN определяет границы окна:
-- UNBOUNDED PRECEDING: от начала
-- N PRECEDING: N строк до текущей
-- CURRENT ROW: текущая строка
-- N FOLLOWING: N строк после
-- UNBOUNDED FOLLOWING: до конца
3. Типы окон по размеру
Скользящее окно фиксированного размера
# Moving Average (скользящее среднее)
def moving_average(data, window_size):
"""Скользящее среднее с окном N"""
result = []
for i in range(len(data)):
start = max(0, i - window_size + 1)
window = data[start:i+1]
result.append(sum(window) / len(window))
return result
prices = [100, 102, 101, 103, 105, 104, 106]
ma_3 = moving_average(prices, 3)
print(ma_3)
# [100.0, 101.0, 101.0, 102.0, 103.0, 104.0, 105.0]
Экспоненциально взвешенное окно
# Exponential Weighted Moving Average (EWMA)
import pandas as pd
data = pd.Series([100, 102, 101, 103, 105, 104, 106])
alpha = 0.3 # Коэффициент сглаживания
ewma = data.ewm(alpha=alpha).mean()
print(ewma.values)
# Недавние значения весят больше
4. Окна в потоковой обработке
Spark Streaming
# Spark Structured Streaming с разными окнами
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, count
spark = SparkSession.builder.appName("Windows").getOrCreate()
# Читаем поток
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Tumbling window: 5 минут
tumbling = df.groupBy(
window(col("timestamp"), "5 minutes")
).count()
# Sliding window: 10 минут с 5-минутным сдвигом
sliding = df.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes")
).count()
# Session window: 10 минут неактивности
session = df.groupBy(
col("user_id"),
session_window(col("timestamp"), "10 minutes")
).count()
Apache Flink Window Functions
// Java Flink
DataStream<Event> events = // ...
// Tumbling window
events
.keyBy(event -> event.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new SumAggregate())
.addSink(new RollingSink<>("/path/to/output"));
// Sliding window
events
.keyBy(event -> event.userId)
.window(SlidingEventTimeWindows.of(
Time.minutes(10), // window size
Time.minutes(5) // slide
))
.reduce((e1, e2) -> e1.amount + e2.amount)
.print();
5. Окна по значениям (Value-based Windows)
Сегментирование по значениям вместо размера:
# Окна по достижению определённого значения
def value_based_windows(data, threshold):
"""Разбить на окна по сумме значений"""
windows = []
current_window = []
current_sum = 0
for value in data:
if current_sum + value > threshold:
if current_window:
windows.append(current_window)
current_window = [value]
current_sum = value
else:
current_window.append(value)
current_sum += value
if current_window:
windows.append(current_window)
return windows
data = [10, 20, 15, 25, 30, 5]
windows = value_based_windows(data, 50)
print(windows)
# [[10, 20, 15], [25, 30], [5]]
6. Сравнение типов окон
| Тип | Перекрытие | Границы | Случай использования |
|---|---|---|---|
| Tumbling | Нет | Фиксирован | Часовые отчёты |
| Sliding | Да | Фиксирован | Скользящее среднее |
| Session | Нет | Динамический | Анализ сеансов |
| Value-based | Переменно | По значению | Батчи по размеру |
| EWMA | Постоянно | Экспоненциальный | Временные ряды |
7. Практический пример: обработка logов
# Подсчёт ошибок по часам + скользящее среднее
import pandas as pd
from datetime import datetime, timedelta
logs = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=240, freq='15min'),
'error_count': [5, 3, 7, 2, 8, 4, 6, 9, 3, 5] * 24 # 10 дней
})
# Hourly aggregation (tumbling)
hourly = logs.set_index('timestamp').resample('1H').sum()
print("Hourly errors:")
print(hourly.head())
# 4-hour moving average (sliding)
moving_avg = logs.set_index('timestamp')['error_count'].rolling('4H').mean()
print("\n4-hour moving average:")
print(moving_avg.head(10))
# Exponential weighted
ewma = logs['error_count'].ewm(span=12).mean() # 3 часа
print("\nEWMA (3-hour span):")
print(ewma.head())
8. Оптимизация окон для производительности
# Использование numpy для больших данных
import numpy as np
from scipy.ndimage import uniform_filter1d
data = np.random.rand(1000000)
window_size = 100
# Быстрое скользящее среднее
moving_avg = uniform_filter1d(data, size=window_size, mode='nearest')
# Vs. медленное
import time
start = time.time()
for i in range(len(data) - window_size):
_ = np.mean(data[i:i+window_size])
print(f"Slow: {time.time() - start:.3f}s")
start = time.time()
_ = uniform_filter1d(data, size=window_size)
print(f"Fast: {time.time() - start:.3f}s")
Выбор правильного типа окна зависит от семантики задачи и требований к производительности.