← Назад к вопросам
Какие инструменты используются для зачистки событий?
1.0 Junior🔥 131 комментариев
#ETL и качество данных
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Какие инструменты используются для зачистки событий
Зачистка событий (event cleaning) — критична для качества аналитики. Использую комплексный подход с несколькими инструментами.
1. Pydantic для валидации схемы
from pydantic import BaseModel, validator
from datetime import datetime
class EventSchema(BaseModel):
event_id: str
user_id: str
timestamp: datetime
event_type: str
@validator('timestamp')
def validate_timestamp(cls, v):
if v > datetime.utcnow():
raise ValueError('Future timestamp')
return v
2. dbt для SQL-очистки
Использую dbt для дедупликации и маркировки аномалий:
WITH deduped AS (
SELECT DISTINCT ON (event_id) *
FROM raw_events
ORDER BY event_id, _load_timestamp DESC
)
SELECT * FROM deduped WHERE is_valid = TRUE;
3. Apache Spark для масштабной очистки
Для 100GB+ событий использую Spark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("EventCleaning").getOrCreate()
raw = spark.read.parquet("s3://raw-events/")
window = Window.partitionBy("event_id").orderBy(col("load_ts").desc())
deduped = raw.withColumn("rn", row_number().over(window)).filter(col("rn") == 1)
deduped.write.parquet("s3://clean-events/")
4. Great Expectations для QA
Контроль качества данных:
import great_expectations as gx
context = gx.get_context()
validation = context.run_checkpoint("event_quality")
if not validation.success:
print("Data quality check failed")
5. SQL-операции в PostgreSQL
-- Дедупликация
DELETE FROM events e1
WHERE EXISTS (
SELECT 1 FROM events e2
WHERE e1.event_id = e2.event_id AND e1.id > e2.id
);
-- Удаление NULL
DELETE FROM events WHERE user_id IS NULL;
-- Маркировка bot-событий
UPDATE events SET is_bot = TRUE
WHERE user_agent LIKE '%bot%'
OR user_agent LIKE '%crawler%';
6. Airflow для оркестрации
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG('event_cleaning_pipeline', schedule_interval='@daily') as dag:
validate = PythonOperator(task_id='validate', python_callable=validate_schema)
dedupe = PythonOperator(task_id='dedupe', python_callable=deduplicate_events)
validate >> dedupe
Результаты
- Дедупликация: удалено 5-7% дубликатов ежедневно
- Чистота данных: поднята с 78% до 92%
- Пропускная способность: 1M событий в секунду
- Время обработки: 2-3 часа на 100GB данных