Работал с технологией Ray в Linux системах
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Ray: распределённые вычисления в Linux системах
Ray - это современный фреймворк для распределённых вычислений, разработанный в UC Berkeley. Рассажу о моём опыте работы с ним в Linux системах и production окружении.
Что такое Ray
Ray - это фреймворк для:
- Распределённых вычислений (distributed computing)
- Параллельной обработки данных (parallel processing)
- Machine Learning training (масштабирование ML)
- Microservices (масштабируемые сервисы)
Архитектура Ray
┌─────────────────────────────────────────┐
│ Ray Driver (main) │
│ (ваш Python скрипт, Jupyter notebook) │
└────────────────┬────────────────────────┘
│
┌─────────┴──────────┐
│ Ray Head Node │
│ (главный узел) │
│ - Scheduler │
│ - GCS (metadata) │
└──────────┬─────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌───▼──┐ ┌───▼──┐ ┌───▼──┐
│Worker│ │Worker│ │Worker│
│Node1 │ │Node2 │ │Node3 │
└──────┘ └──────┘ └──────┘
Установка в Linux
# Ubuntu/Debian
sudo apt-get install -y build-essential python3-dev
# Установить Ray
pip install -U ray
# Проверить установку
python -c "import ray; print(ray.__version__)"
# Для GPU support
pip install ray[tune]
pip install ray[rllib] # Для reinforcement learning
pip install ray[ml] # Для machine learning
Запуск Ray кластера
Локальный кластер (для development):
import ray
# Инициализация локального кластера
ray.init()
# Информация о кластере
info = ray.cluster_resources()
print(f"CPU cores: {info['CPU']}")
print(f"Memory: {info['memory']}")
# Завершить
ray.shutdown()
Распределённый кластер (multi-node):
# На head node
ray start --head --port=6379 --num-cpus=8
# На worker nodes
ray start --address=<head_ip>:6379 --num-cpus=8
# Проверить статус
ray status
# UI для мониторинга (Ray Dashboard)
# Доступна на http://localhost:8265
Основные концепции Ray
1. Remote Functions (распределённые функции)
import ray
ray.init()
# Обычная функция
def slow_function(x):
import time
time.sleep(2)
return x ** 2
# Remote функция (выполняется на worker)
@ray.remote
def remote_slow_function(x):
import time
time.sleep(2)
return x ** 2
# Обычное выполнение (блокирующее)
result = slow_function(5) # Ждёт 2 секунды
# Ray выполнение (не блокирующее)
future = remote_slow_function.remote(5) # Возвращает сразу
result = ray.get(future) # Ждёт результат
2. Remote Actors (состояние на worker)
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
def get_count(self):
return self.count
# Создать актор
counter = Counter.remote()
# Вызвать методы
future1 = counter.increment.remote()
future2 = counter.increment.remote()
future3 = counter.get_count.remote()
print(ray.get([future1, future2, future3])) # [1, 2, 2]
3. Ray Tasks vs Actors
Tasks (функции):
- Stateless (без состояния)
- Параллельное выполнение
- Горизонтально масштабируются
Actors (объекты):
- Stateful (с состоянием)
- Сериализованное выполнение (по одному)
- Для кэширования, счётчиков, конфигурации
Практический пример: обработка больших данных
import ray
import numpy as np
from ray.util import put_object
ray.init()
# Большой датасет
large_data = np.random.rand(1000000, 100)
# Положить в Ray Object Store (разделится между workers)
data_ref = ray.put(large_data)
@ray.remote
def process_chunk(data_chunk):
# Тяжёлая обработка
return np.sum(data_chunk ** 2)
# Распределить обработку
num_chunks = 4
chunk_size = len(large_data) // num_chunks
futures = []
for i in range(num_chunks):
start = i * chunk_size
end = start + chunk_size if i < num_chunks - 1 else len(large_data)
# Обработка в параллель
future = process_chunk.remote(
large_data[start:end]
)
futures.append(future)
# Собрать результаты
results = ray.get(futures)
print(f"Total: {sum(results)}")
Ray Tune (гиперпараметрический поиск)
from ray import tune
from ray.tune import CLIReporter
def train_model(config):
"""Функция обучения модели"""
learning_rate = config["lr"]
batch_size = config["batch_size"]
for epoch in range(10):
# Обучение
accuracy = 0.9 + epoch * 0.01 + learning_rate
# Отправить метрику в Ray
tune.report(accuracy=accuracy)
# Гиперпараметрический поиск
analysis = tune.run(
train_model,
config={
"lr": tune.grid_search([0.001, 0.01, 0.1]),
"batch_size": tune.choice([32, 64, 128]),
},
num_samples=2, # 2 trials на каждую комбинацию
verbose=1,
progress_reporter=CLIReporter(metric_columns=["accuracy"])
)
# Лучшая конфигурация
print(f"Best config: {analysis.best_config}")
print(f"Best accuracy: {analysis.best_result['accuracy']}")
Ray RLLib (Reinforcement Learning)
from ray.rllib.algorithms.ppo import PPO
# Создать и натренировать policy
trainer = PPO(
env="CartPole-v1",
config={
"framework": "torch",
"num_rollout_workers": 4,
}
)
# Обучение
for i in range(10):
result = trainer.train()
print(f"Episode {i}: reward={result['episode_reward_mean']}")
# Сохранить и загрузить модель
trainer.save("/tmp/ray_cartpole")
Ray Serve (ML inference serving)
from ray import serve
import pickle
serve.start()
@serve.deployment
class ModelServing:
def __init__(self, model_path):
with open(model_path, 'rb') as f:
self.model = pickle.load(f)
async def __call__(self, request):
data = request.json()
prediction = self.model.predict([data['features']])
return {"prediction": float(prediction[0])}
# Развернуть
serve.run(
ModelServing.bind("/path/to/model.pkl"),
route_prefix="/predict"
)
# HTTP запрос
# POST http://localhost:8000/predict
# {"features": [1.0, 2.0, 3.0]}
Мониторинг Ray кластера
Ray Dashboard (UI)
# Автоматически доступна на http://localhost:8265
# Показывает:
# - Количество tasks/actors
# - CPU/Memory usage
# - Error logs
# - Object Store
Программный мониторинг
import ray
# Информация о кластере
print(ray.cluster_resources())
# Информация о tasks
print(ray.timeline())
# Информация о memory
print(ray.available_resources())
# Список running tasks
print(ray.tasks())
Production Best Practices
1. Настройка ресурсов
@ray.remote(num_cpus=2, num_gpus=1, memory=1000*1024*1024)
def gpu_task():
import torch
# Используем GPU
pass
2. Error handling
import ray
@ray.remote(max_retries=3)
def reliable_task():
# Автоматический retry при ошибке
pass
# Обработка ошибок
try:
result = ray.get(future)
except ray.exceptions.RayError as e:
print(f"Task failed: {e}")
3. Скейлинг
# Auto-scaling через Kubernetes
ray up cluster.yaml # Запустить кластер
ray exec cluster.yaml "python script.py" # Выполнить скрипт
4. Persistent Storage
# Использовать S3 для сохранения модели
ray_client = ray.init(runtime_env={
"env_vars": {"AWS_ACCESS_KEY_ID": "..."},
})
Проблемы и решения
1. OOM (Out of Memory)
# Решение: уменьшить batch size или количество workers
ray.init(object_store_memory=1000*1024*1024) # 1GB
2. Slow serialization
# Проблема: большие объекты медленно сериализуются
# Решение: использовать ray.put() для больших данных
data_ref = ray.put(large_array)
future = process.remote(data_ref)
3. Network bottleneck
# Решение: используй ray.plasma для inter-node communication
ray start --object-manager-port=8076
Выводы
Ray в Linux - это мощная технология для: ✅ Распределённых вычислений ✅ Machine Learning масштабирования ✅ Параллельной обработки больших данных ✅ Микросервисов на Python
Основные use cases:
- Гиперпараметрический поиск для ML (Ray Tune)
- Reinforcement Learning (Ray RLLib)
- ML inference serving (Ray Serve)
- General distributed computing
Для DevOps: Ray хорошо интегрируется с Kubernetes, имеет мониторинг через Dashboard, поддерживает auto-scaling.
Правило: используй Ray когда нужно масштабировать Python приложение на несколько машин с минимальными изменениями кода.