← Назад к вопросам

HTTP клиент к внешнему сервису с батчами

2.0 Middle🔥 211 комментариев
#Сетевые протоколы и API

Условие

Есть внешний сервис, который обрабатывает объекты батчами. Сервис может обрабатывать только n элементов за временной интервал p. Реализуйте клиент, который позволит обрабатывать максимальное количество объектов без блокировки.

Интерфейс

type BatchClient struct {
    batchSize int
    interval  time.Duration
    // ваши поля
}

func NewBatchClient(batchSize int, interval time.Duration) *BatchClient
func (c *BatchClient) Process(ctx context.Context, items []string) error

Требования

  • Не блокировать отправителей
  • Группировать запросы в батчи размера batchSize
  • Отправлять батч не чаще чем раз в interval
  • Поддерживать отмену через context

Это реальное тестовое задание из KazanExpress

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

HTTP клиент с батчами - реальное решение KazanExpress

Описание задачи

Нужно реализовать батч-клиент, который:

  • Накапливает запросы и отправляет их группами (батчами)
  • Соблюдает ограничение n элементов за период p
  • Не блокирует отправителей
  • Поддерживает graceful shutdown через context
  • Гарантирует доставку данных

Это реальная задача для работы с rate-limited API (Telegram, Yandex, 1C, etc.)

Архитектурное решение

Основные компоненты:

  1. Входящий канал - для получения элементов от клиентов
  2. Горутина-батчер - накапливает элементы и отправляет батчи
  3. Таймер - гарантирует отправку даже если батч не полный
  4. Результаты - каналы для получения результатов/ошибок

Реализация

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
)

type BatchClient struct {
    batchSize int
    interval  time.Duration
    
    // Внутренние поля
    itemsCh   chan string
    stopCh    chan struct{}
    wg        sync.WaitGroup
}

func NewBatchClient(batchSize int, interval time.Duration) *BatchClient {
    return &BatchClient{
        batchSize: batchSize,
        interval:  interval,
        itemsCh:   make(chan string, batchSize*2), // буфер для non-blocking sends
        stopCh:    make(chan struct{}),
    }
}

// Process добавляет элементы в очередь для батча (не блокирует)
func (c *BatchClient) Process(ctx context.Context, items []string) error {
    for _, item := range items {
        select {
        case c.itemsCh <- item:
            // Успешно добавили в очередь
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    return nil
}

// Start запускает батчер (вызывается один раз при инициализации)
func (c *BatchClient) Start(ctx context.Context) {
    c.wg.Add(1)
    go c.batchWorker(ctx)
}

// Stop gracefully завершает работу батчера
func (c *BatchClient) Stop(timeout time.Duration) error {
    close(c.itemsCh) // сигнал о прекращении приема новых элементов
    
    done := make(chan struct{})
    go func() {
        c.wg.Wait()
        close(done)
    }()
    
    select {
    case <-done:
        return nil
    case <-time.After(timeout):
        return fmt.Errorf("timeout waiting for batch worker to stop")
    }
}

// batchWorker обрабатывает элементы батчами
func (c *BatchClient) batchWorker(ctx context.Context) {
    defer c.wg.Done()
    
    batch := make([]string, 0, c.batchSize)
    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()
    
    for {
        select {
        case item, ok := <-c.itemsCh:
            if !ok {
                // Канал закрыт, отправляем оставшиеся элементы
                if len(batch) > 0 {
                    c.sendBatch(ctx, batch)
                }
                return
            }
            
            batch = append(batch, item)
            
            // Если батч полный, отправляем сразу
            if len(batch) >= c.batchSize {
                c.sendBatch(ctx, batch)
                batch = make([]string, 0, c.batchSize)
                ticker.Reset(c.interval)  // сбросить таймер
            }
            
        case <-ticker.C:
            // Таймер сработал - отправляем частичный батч если есть элементы
            if len(batch) > 0 {
                c.sendBatch(ctx, batch)
                batch = make([]string, 0, c.batchSize)
            }
            
        case <-ctx.Done():
            // Контекст отменен
            if len(batch) > 0 {
                c.sendBatch(ctx, batch)
            }
            return
        }
    }
}

// sendBatch отправляет батч на внешний сервис
func (c *BatchClient) sendBatch(ctx context.Context, batch []string) {
    log.Printf("Sending batch of %d items", len(batch))
    
    // Здесь реальный HTTP запрос к сервису
    // Для примера используем mock
    if err := c.callService(ctx, batch); err != nil {
        log.Printf("Error sending batch: %v", err)
        // В реальном коде здесь была бы retry логика
    }
}

// callService имитирует HTTP запрос к внешнему сервису
func (c *BatchClient) callService(ctx context.Context, items []string) error {
    // Реальная реализация:
    // payload := map[string]interface{}{"items": items}
    // body, _ := json.Marshal(payload)
    // req, _ := http.NewRequestWithContext(ctx, "POST", "https://api.example.com/batch", bytes.NewReader(body))
    // resp, err := http.DefaultClient.Do(req)
    // if err != nil { return err }
    // return resp.Body.Close()
    
    // Mock для примера
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(100 * time.Millisecond):
        return nil
    }
}

func main() {
    fmt.Println("=== Batch Client Example ===")
    
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    client := NewBatchClient(
        3,              // batchSize = 3
        500*time.Millisecond, // interval = 500ms
    )
    
    // Запускаем батчер
    client.Start(ctx)
    
    // Отправляем элементы
    items1 := []string{"item1", "item2"}
    client.Process(ctx, items1)
    
    items2 := []string{"item3", "item4", "item5"}
    client.Process(ctx, items2)
    
    items3 := []string{"item6"}
    client.Process(ctx, items3)
    
    // Даем время на обработку
    time.Sleep(2 * time.Second)
    
    // Graceful shutdown
    if err := client.Stop(5 * time.Second); err != nil {
        log.Printf("Stop error: %v", err)
    }
    
    fmt.Println("\\nBatch client stopped gracefully")
}

Улучшенная версия с результатами и retry

type ResultChannel struct {
    Items []string
    Err   error
}

type BatchClientV2 struct {
    batchSize  int
    interval   time.Duration
    maxRetries int
    
    itemsCh   chan string
    resultsCh chan ResultChannel
    stopCh    chan struct{}
    wg        sync.WaitGroup
}

func NewBatchClientV2(batchSize int, interval time.Duration) *BatchClientV2 {
    return &BatchClientV2{
        batchSize:  batchSize,
        interval:   interval,
        maxRetries: 3,
        itemsCh:    make(chan string, batchSize*2),
        resultsCh:  make(chan ResultChannel, 1),
    }
}

func (c *BatchClientV2) sendBatchWithRetry(ctx context.Context, batch []string) error {
    var lastErr error
    
    for attempt := 0; attempt < c.maxRetries; attempt++ {
        if err := c.callService(ctx, batch); err == nil {
            return nil
        } else {
            lastErr = err
            // Exponential backoff
            backoff := time.Duration(1<<uint(attempt)) * 100 * time.Millisecond
            select {
            case <-time.After(backoff):
            case <-ctx.Done():
                return ctx.Err()
            }
        }
    }
    
    return lastErr
}

func (c *BatchClientV2) Results() <-chan ResultChannel {
    return c.resultsCh
}

Анализ решения

Ключевые особенности:

  1. Non-blocking Process - использует буферизированный канал, клиент не ждет
  2. Две условия отправки:
    • Батч полный (размер == batchSize)
    • Таймер сработал (прошел interval)
  3. Graceful shutdown - закрытие канала сигнализирует об окончании работы
  4. Обработка контекста - отмена через context.Done()
  5. Синхронизация - WaitGroup гарантирует завершение работника

Time & Space Complexity

  • Process: O(1) - одна отправка в канал
  • sendBatch: O(n) где n = batchSize
  • Memory: O(batchSize) - буфер для батча

Реальное использование

client := NewBatchClient(100, 5*time.Second)
client.Start(ctx)
defer client.Stop(10*time.Second)

for _, order := range orders {
    client.Process(ctx, []string{order})
}

Альтернативы

  1. uber/go-torch - готовая библиотека для батчей
  2. Kafka/RabbitMQ - для более сложных сценариев
  3. Go sync.Pool - если нужна переиспользование батчей