← Назад к вопросам

Работал с технологией Ray в Linux системах

1.0 Junior🔥 111 комментариев
#Linux и администрирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ray: распределённые вычисления в Linux системах

Ray - это современный фреймворк для распределённых вычислений, разработанный в UC Berkeley. Рассажу о моём опыте работы с ним в Linux системах и production окружении.

Что такое Ray

Ray - это фреймворк для:

  • Распределённых вычислений (distributed computing)
  • Параллельной обработки данных (parallel processing)
  • Machine Learning training (масштабирование ML)
  • Microservices (масштабируемые сервисы)

Архитектура Ray

┌─────────────────────────────────────────┐
│           Ray Driver (main)             │
│  (ваш Python скрипт, Jupyter notebook)  │
└────────────────┬────────────────────────┘
                 │
       ┌─────────┴──────────┐
       │   Ray Head Node    │
       │   (главный узел)   │
       │  - Scheduler       │
       │  - GCS (metadata)  │
       └──────────┬─────────┘
                  │
    ┌─────────────┼─────────────┐
    │             │             │
┌───▼──┐      ┌───▼──┐      ┌───▼──┐
│Worker│      │Worker│      │Worker│
│Node1 │      │Node2 │      │Node3 │
└──────┘      └──────┘      └──────┘

Установка в Linux

# Ubuntu/Debian
sudo apt-get install -y build-essential python3-dev

# Установить Ray
pip install -U ray

# Проверить установку
python -c "import ray; print(ray.__version__)"

# Для GPU support
pip install ray[tune]
pip install ray[rllib]  # Для reinforcement learning
pip install ray[ml]    # Для machine learning

Запуск Ray кластера

Локальный кластер (для development):

import ray

# Инициализация локального кластера
ray.init()

# Информация о кластере
info = ray.cluster_resources()
print(f"CPU cores: {info['CPU']}")
print(f"Memory: {info['memory']}")

# Завершить
ray.shutdown()

Распределённый кластер (multi-node):

# На head node
ray start --head --port=6379 --num-cpus=8

# На worker nodes
ray start --address=<head_ip>:6379 --num-cpus=8

# Проверить статус
ray status

# UI для мониторинга (Ray Dashboard)
# Доступна на http://localhost:8265

Основные концепции Ray

1. Remote Functions (распределённые функции)

import ray
ray.init()

# Обычная функция
def slow_function(x):
    import time
    time.sleep(2)
    return x ** 2

# Remote функция (выполняется на worker)
@ray.remote
def remote_slow_function(x):
    import time
    time.sleep(2)
    return x ** 2

# Обычное выполнение (блокирующее)
result = slow_function(5)  # Ждёт 2 секунды

# Ray выполнение (не блокирующее)
future = remote_slow_function.remote(5)  # Возвращает сразу
result = ray.get(future)  # Ждёт результат

2. Remote Actors (состояние на worker)

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self):
        self.count += 1
        return self.count
    
    def get_count(self):
        return self.count

# Создать актор
counter = Counter.remote()

# Вызвать методы
future1 = counter.increment.remote()
future2 = counter.increment.remote()
future3 = counter.get_count.remote()

print(ray.get([future1, future2, future3]))  # [1, 2, 2]

3. Ray Tasks vs Actors

Tasks (функции):
- Stateless (без состояния)
- Параллельное выполнение
- Горизонтально масштабируются

Actors (объекты):
- Stateful (с состоянием)
- Сериализованное выполнение (по одному)
- Для кэширования, счётчиков, конфигурации

Практический пример: обработка больших данных

import ray
import numpy as np
from ray.util import put_object

ray.init()

# Большой датасет
large_data = np.random.rand(1000000, 100)

# Положить в Ray Object Store (разделится между workers)
data_ref = ray.put(large_data)

@ray.remote
def process_chunk(data_chunk):
    # Тяжёлая обработка
    return np.sum(data_chunk ** 2)

# Распределить обработку
num_chunks = 4
chunk_size = len(large_data) // num_chunks

futures = []
for i in range(num_chunks):
    start = i * chunk_size
    end = start + chunk_size if i < num_chunks - 1 else len(large_data)
    
    # Обработка в параллель
    future = process_chunk.remote(
        large_data[start:end]
    )
    futures.append(future)

# Собрать результаты
results = ray.get(futures)
print(f"Total: {sum(results)}")

Ray Tune (гиперпараметрический поиск)

from ray import tune
from ray.tune import CLIReporter

def train_model(config):
    """Функция обучения модели"""
    learning_rate = config["lr"]
    batch_size = config["batch_size"]
    
    for epoch in range(10):
        # Обучение
        accuracy = 0.9 + epoch * 0.01 + learning_rate
        
        # Отправить метрику в Ray
        tune.report(accuracy=accuracy)

# Гиперпараметрический поиск
analysis = tune.run(
    train_model,
    config={
        "lr": tune.grid_search([0.001, 0.01, 0.1]),
        "batch_size": tune.choice([32, 64, 128]),
    },
    num_samples=2,  # 2 trials на каждую комбинацию
    verbose=1,
    progress_reporter=CLIReporter(metric_columns=["accuracy"])
)

# Лучшая конфигурация
print(f"Best config: {analysis.best_config}")
print(f"Best accuracy: {analysis.best_result['accuracy']}")

Ray RLLib (Reinforcement Learning)

from ray.rllib.algorithms.ppo import PPO

# Создать и натренировать policy
trainer = PPO(
    env="CartPole-v1",
    config={
        "framework": "torch",
        "num_rollout_workers": 4,
    }
)

# Обучение
for i in range(10):
    result = trainer.train()
    print(f"Episode {i}: reward={result['episode_reward_mean']}")

# Сохранить и загрузить модель
trainer.save("/tmp/ray_cartpole")

Ray Serve (ML inference serving)

from ray import serve
import pickle

serve.start()

@serve.deployment
class ModelServing:
    def __init__(self, model_path):
        with open(model_path, 'rb') as f:
            self.model = pickle.load(f)
    
    async def __call__(self, request):
        data = request.json()
        prediction = self.model.predict([data['features']])
        return {"prediction": float(prediction[0])}

# Развернуть
serve.run(
    ModelServing.bind("/path/to/model.pkl"),
    route_prefix="/predict"
)

# HTTP запрос
# POST http://localhost:8000/predict
# {"features": [1.0, 2.0, 3.0]}

Мониторинг Ray кластера

Ray Dashboard (UI)

# Автоматически доступна на http://localhost:8265
# Показывает:
# - Количество tasks/actors
# - CPU/Memory usage
# - Error logs
# - Object Store

Программный мониторинг

import ray

# Информация о кластере
print(ray.cluster_resources())

# Информация о tasks
print(ray.timeline())

# Информация о memory
print(ray.available_resources())

# Список running tasks
print(ray.tasks())

Production Best Practices

1. Настройка ресурсов

@ray.remote(num_cpus=2, num_gpus=1, memory=1000*1024*1024)
def gpu_task():
    import torch
    # Используем GPU
    pass

2. Error handling

import ray

@ray.remote(max_retries=3)
def reliable_task():
    # Автоматический retry при ошибке
    pass

# Обработка ошибок
try:
    result = ray.get(future)
except ray.exceptions.RayError as e:
    print(f"Task failed: {e}")

3. Скейлинг

# Auto-scaling через Kubernetes
ray up cluster.yaml  # Запустить кластер
ray exec cluster.yaml "python script.py"  # Выполнить скрипт

4. Persistent Storage

# Использовать S3 для сохранения модели
ray_client = ray.init(runtime_env={
    "env_vars": {"AWS_ACCESS_KEY_ID": "..."},
})

Проблемы и решения

1. OOM (Out of Memory)

# Решение: уменьшить batch size или количество workers
ray.init(object_store_memory=1000*1024*1024)  # 1GB

2. Slow serialization

# Проблема: большие объекты медленно сериализуются
# Решение: использовать ray.put() для больших данных
data_ref = ray.put(large_array)
future = process.remote(data_ref)

3. Network bottleneck

# Решение: используй ray.plasma для inter-node communication
ray start --object-manager-port=8076

Выводы

Ray в Linux - это мощная технология для: ✅ Распределённых вычислений ✅ Machine Learning масштабирования ✅ Параллельной обработки больших данных ✅ Микросервисов на Python

Основные use cases:

  • Гиперпараметрический поиск для ML (Ray Tune)
  • Reinforcement Learning (Ray RLLib)
  • ML inference serving (Ray Serve)
  • General distributed computing

Для DevOps: Ray хорошо интегрируется с Kubernetes, имеет мониторинг через Dashboard, поддерживает auto-scaling.

Правило: используй Ray когда нужно масштабировать Python приложение на несколько машин с минимальными изменениями кода.