Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
UDF в Spark (User Defined Function)
UDF (User Defined Function) — это пользовательская функция, которая позволяет расширить функционал Spark SQL и DataFrame API. UDF используется для обработки данных, которая не может быть реализована встроенными функциями.
Типы UDF в Spark
В Spark существует несколько типов UDF:
1. Python UDF (PySpark)
Основный способ создания UDF на Python:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# Определение функции
def extract_domain(email: str) -> str:
if email:
return email.split("@")[1]
return None
# Регистрация как UDF
extract_domain_udf = udf(extract_domain, StringType())
# Использование
df = spark.createDataFrame([("user@gmail.com",), ("admin@company.org",)], ["email"])
result = df.select(extract_domain_udf("email"))
2. SQL UDF
Регистрация UDF через SQL:
CREATE TEMPORARY FUNCTION extract_domain(email STRING)
RETURNS STRING
RETURN SUBSTRING(email, INSTR(email, "@") + 1);
SELECT email, extract_domain(email) as domain
FROM users;
3. Pandas UDF (Arrow UDF)
Более эффективный способ обработки больших объёмов данных:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
@pandas_udf(StringType())
def extract_domain_pandas(emails: pd.Series) -> pd.Series:
return emails.str.split("@").str[1]
# Использование
df = spark.createDataFrame([("user@gmail.com",), ("admin@company.org",)], ["email"])
result = df.select(extract_domain_pandas("email"))
Параметры типов данных
При создании UDF нужно указать тип возвращаемого значения:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
# Простой тип
udf_int = udf(lambda x: int(x) * 2, IntegerType())
# Array
udf_array = udf(lambda x: x.split(","), ArrayType(StringType()))
# Complex Type
struct_type = StructType([
StructField("id", IntegerType()),
StructField("name", StringType())
])
def parse_json(json_str):
import json
data = json.loads(json_str)
return (data["id"], data["name"])
udf_struct = udf(parse_json, struct_type)
Производительность: Python UDF vs Pandas UDF
Python UDF выполняет функцию для каждой строки:
# Медленнее: вызов для каждой строки
@udf(StringType())
def slow_udf(x):
return x.upper()
Pandas UDF обрабатывает батчи данных через Arrow:
# Быстрее: батч-обработка
@pandas_udf(StringType())
def fast_udf(x: pd.Series) -> pd.Series:
return x.str.upper()
Pandas UDF может быть в 100+ раз быстрее на больших данных!
Практический пример: сложная обработка
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
@pandas_udf(DoubleType())
def calculate_risk_score(amounts: pd.Series, frequencies: pd.Series) -> pd.Series:
# Сложная бизнес-логика
return (amounts * 0.7 + frequencies * 0.3) / 100
df = spark.createDataFrame(
[(100.0, 5), (500.0, 2), (50.0, 10)],
["amount", "frequency"]
)
result = df.select(
"amount",
"frequency",
calculate_risk_score("amount", "frequency").alias("risk_score")
)
Когда использовать UDF
Используй UDF, если:
- Встроенные функции Spark не решают задачу
- Нужна сложная бизнес-логика
- Интеграция с внешними библиотеками (scikit-learn, requests)
Избегай UDF, если:
- Можно использовать встроенные функции (они быстрее)
- Сложная логика может быть выражена SQL
- Критична максимальная производительность
Лучшие практики
- Используй Pandas UDF вместо обычного Python UDF для лучшей производительности
- Кешируй результаты если UDF дорогая в вычислении
- Тестируй UDF отдельно перед использованием в DataFrame
- Избегай побочных эффектов (вывод в лог, изменение глобального состояния)
UDF — мощный инструмент для расширения функционала Spark, но требует внимания к производительности.