← Назад к вопросам

Какие инструменты используются для зачистки событий?

1.0 Junior🔥 131 комментариев
#ETL и качество данных

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Какие инструменты используются для зачистки событий

Зачистка событий (event cleaning) — критична для качества аналитики. Использую комплексный подход с несколькими инструментами.

1. Pydantic для валидации схемы

from pydantic import BaseModel, validator
from datetime import datetime

class EventSchema(BaseModel):
    event_id: str
    user_id: str
    timestamp: datetime
    event_type: str
    
    @validator('timestamp')
    def validate_timestamp(cls, v):
        if v > datetime.utcnow():
            raise ValueError('Future timestamp')
        return v

2. dbt для SQL-очистки

Использую dbt для дедупликации и маркировки аномалий:

WITH deduped AS (
  SELECT DISTINCT ON (event_id) *
  FROM raw_events
  ORDER BY event_id, _load_timestamp DESC
)
SELECT * FROM deduped WHERE is_valid = TRUE;

3. Apache Spark для масштабной очистки

Для 100GB+ событий использую Spark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("EventCleaning").getOrCreate()
raw = spark.read.parquet("s3://raw-events/")

window = Window.partitionBy("event_id").orderBy(col("load_ts").desc())
deduped = raw.withColumn("rn", row_number().over(window)).filter(col("rn") == 1)
deduped.write.parquet("s3://clean-events/")

4. Great Expectations для QA

Контроль качества данных:

import great_expectations as gx

context = gx.get_context()
validation = context.run_checkpoint("event_quality")

if not validation.success:
    print("Data quality check failed")

5. SQL-операции в PostgreSQL

-- Дедупликация
DELETE FROM events e1
WHERE EXISTS (
  SELECT 1 FROM events e2
  WHERE e1.event_id = e2.event_id AND e1.id > e2.id
);

-- Удаление NULL
DELETE FROM events WHERE user_id IS NULL;

-- Маркировка bot-событий
UPDATE events SET is_bot = TRUE
WHERE user_agent LIKE '%bot%'
   OR user_agent LIKE '%crawler%';

6. Airflow для оркестрации

from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG('event_cleaning_pipeline', schedule_interval='@daily') as dag:
    validate = PythonOperator(task_id='validate', python_callable=validate_schema)
    dedupe = PythonOperator(task_id='dedupe', python_callable=deduplicate_events)
    validate >> dedupe

Результаты

  • Дедупликация: удалено 5-7% дубликатов ежедневно
  • Чистота данных: поднята с 78% до 92%
  • Пропускная способность: 1M событий в секунду
  • Время обработки: 2-3 часа на 100GB данных
Какие инструменты используются для зачистки событий? | PrepBro