← Назад к вопросам

Что такое Job?

2.3 Middle🔥 201 комментариев
#Spring Framework

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что такое Job

Job — это основная единица работы в фреймворке Spring Batch, предназначенная для обработки больших объемов данных в批处理 режиме (batch processing). Job определяет последовательность шагов (Steps), которые должны быть выполнены для обработки данных.

В контексте Spring Batch, Job — это конфигурация, описывающая как, когда и в каком порядке должны выполняться операции обработки данных.

Основная архитектура Spring Batch

Job
  ├── Step 1 (Read from file)
  │   ├── ItemReader
  │   ├── ItemProcessor
  │   └── ItemWriter
  │
  ├── Step 2 (Validation)
  │   ├── ItemReader
  │   ├── ItemProcessor
  │   └── ItemWriter
  │
  └── Step 3 (Save to database)
      ├── ItemReader
      ├── ItemProcessor
      └── ItemWriter

Компоненты Job

1. Job — основной контейнер

Определяет общую структуру и имя задачи

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    
    @Bean
    public Job importUserJob(JobBuilderFactory jobBuilderFactory, Step step1) {
        return jobBuilderFactory.get("importUserJob")
            .start(step1)
            .build();
    }
}

2. Step — шаги в Job

Каждый Step представляет логическую единицу работы

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
                  ItemReader<User> reader,
                  ItemProcessor<User, User> processor,
                  ItemWriter<User> writer) {
    return stepBuilderFactory.get("step1")
        .<User, User>chunk(10)  // Обработка 10 записей за раз
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .build();
}

Полный пример: Импорт пользователей из CSV

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

// 1. Модель данных
public class User {
    private String firstName;
    private String lastName;
    private String email;
    
    // Getters and Setters
}

// 2. ItemReader - чтение данных из CSV
@Component
public class UserItemReader {
    @Bean
    public FlatFileItemReader<User> reader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("users.csv"));
        reader.setLineMapper(new DefaultLineMapper<User>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames("firstName", "lastName", "email");
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
                setTargetType(User.class);
            }});
        }});
        return reader;
    }
}

// 3. ItemProcessor - обработка данных
@Component
public class UserItemProcessor implements ItemProcessor<User, User> {
    @Override
    public User process(User user) throws Exception {
        // Валидация и трансформация
        if (user.getEmail() == null || user.getEmail().isEmpty()) {
            throw new ValidationException("Email cannot be empty");
        }
        
        // Нормализация
        user.setFirstName(user.getFirstName().toUpperCase());
        user.setLastName(user.getLastName().toUpperCase());
        
        return user;
    }
}

// 4. ItemWriter - запись данных в БД
@Component
public class UserItemWriter implements ItemWriter<User> {
    @Autowired
    private UserRepository userRepository;
    
    @Override
    public void write(List<? extends User> users) throws Exception {
        // Сохраняем пакет пользователей
        userRepository.saveAll(users);
        System.out.println("Saved " + users.size() + " users");
    }
}

// 5. Конфигурация Job
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Autowired
    private UserItemReader userItemReader;
    
    @Autowired
    private UserItemProcessor processor;
    
    @Autowired
    private UserItemWriter writer;
    
    // Определяем Step
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
            .<User, User>chunk(100)  // Обрабатываем 100 записей за раз
            .reader(userItemReader.reader())
            .processor(processor)
            .writer(writer)
            .build();
    }
    
    // Определяем Job
    @Bean
    public Job importUserJob(Step step1) {
        return jobBuilderFactory.get("importUserJob")
            .start(step1)
            .build();
    }
}

Запуск Job

// Вариант 1: Запуск через REST API
@RestController
@RequestMapping("/batch")
public class JobLauncherController {
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private Job importUserJob;
    
    @PostMapping("/import-users")
    public ResponseEntity<String> importUsers() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
            .addLong("time", System.currentTimeMillis())
            .toJobParameters();
        
        JobExecution execution = jobLauncher.run(importUserJob, jobParameters);
        
        return ResponseEntity.ok("Job started with ID: " + execution.getId());
    }
}

// Вариант 2: Запуск по расписанию
@Component
public class ScheduledJobLauncher {
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private Job importUserJob;
    
    // Запуск каждый день в полночь
    @Scheduled(cron = "0 0 0 * * ?")
    public void runImportUserJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
            .addDate("run.date", new Date())
            .toJobParameters();
        
        JobExecution execution = jobLauncher.run(importUserJob, jobParameters);
        System.out.println("Job completed with status: " + execution.getStatus());
    }
}

Продвинутые концепции

1. Множественные шаги (Multi-Step Job)

@Bean
public Job multiStepJob(Step step1, Step step2, Step step3) {
    return jobBuilderFactory.get("multiStepJob")
        .start(step1)
        .next(step2)
        .next(step3)
        .build();
}

2. Условная логика (Flow)

@Bean
public Job conditionalStepJob(Step step1, Step step2, Step step3) {
    return jobBuilderFactory.get("conditionalStepJob")
        .start(step1)
        .on("COMPLETED").to(step2)  // Если Step1 успешен, идем на Step2
        .on("FAILED").to(step3)      // Если Step1 не успешен, идем на Step3
        .end()
        .build();
}

3. Параллельная обработка (Parallel Steps)

@Bean
public Job parallelStepJob(Step step1, Step step2) {
    return jobBuilderFactory.get("parallelStepJob")
        .start(parallelFlow())
        .build();
}

@Bean
public Flow parallelFlow() {
    return new FlowBuilder<Flow>("parallelFlow")
        .split(new SimpleAsyncTaskExecutor())
        .add(flow1(), flow2())
        .build();
}

4. Обработка ошибок (Error Handling)

@Bean
public Step stepWithErrorHandling() {
    return stepBuilderFactory.get("stepWithErrorHandling")
        .<User, User>chunk(10)
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .faultTolerant()  // Включаем обработку ошибок
        .skip(ValidationException.class)  // Пропускаем эту ошибку
        .skipLimit(10)  // Максимум 10 пропусков
        .retry(IOException.class)  // Повторяем попытку при IOE
        .retryLimit(3)  // Максимум 3 повтора
        .build();
}

5. Слушатели (Listeners)

// Слушатель Job
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
    
    @Autowired
    private UserRepository userRepository;
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println("Job completed successfully");
            System.out.println("Total users imported: " + userRepository.count());
        }
    }
}

// Слушатель Step
@Component
public class StepExecutionLogger implements StepExecutionListener {
    
    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("Step started: " + stepExecution.getStepName());
    }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("Step ended with " + stepExecution.getReadCount() + " reads");
        return ExitStatus.COMPLETED;
    }
}

Мониторинг Job

// Получение информации о выполненных Job
@Component
public class JobMonitor {
    
    @Autowired
    private JobRepository jobRepository;
    
    public void printJobStatistics() {
        List<JobInstance> instances = jobRepository.findJobInstancesByJobName("importUserJob", 0, 10);
        
        for (JobInstance instance : instances) {
            List<JobExecution> executions = jobRepository.findJobExecutions(instance);
            
            for (JobExecution execution : executions) {
                System.out.println("Job: " + execution.getJobInstance().getJobName());
                System.out.println("Status: " + execution.getStatus());
                System.out.println("Start Time: " + execution.getStartTime());
                System.out.println("End Time: " + execution.getEndTime());
                System.out.println("Read Count: " + execution.getStepExecutions().stream()
                    .mapToInt(StepExecution::getReadCount)
                    .sum());
            }
        }
    }
}

Пример: Реальный сценарий

// Job: Ежедневный импорт отчетов о продажах
@Configuration
public class SalesReportBatchJob {
    
    @Bean
    public Step readSalesFromAPI() {
        // Читаем данные из внешнего API
        return stepBuilderFactory.get("readSalesFromAPI")
            .<Sale, Sale>chunk(50)
            .reader(new ApiItemReader(apiClient))
            .processor(new SalesValidationProcessor())
            .writer(new SalesWriter(saleRepository))
            .build();
    }
    
    @Bean
    public Step aggregateSalesData() {
        // Агрегируем продажи по категориям
        return stepBuilderFactory.get("aggregateSalesData")
            .<Sale, SalesReport>chunk(100)
            .reader(new SalesReader(saleRepository))
            .processor(new SalesAggregationProcessor())
            .writer(new SalesReportWriter(reportRepository))
            .build();
    }
    
    @Bean
    public Step sendReportEmail() {
        // Отправляем отчет по email
        return stepBuilderFactory.get("sendReportEmail")
            .<SalesReport, SalesReport>chunk(10)
            .reader(new ReportReader(reportRepository))
            .processor(new EmailReportProcessor(emailService))
            .writer(new EmailWriter())
            .build();
    }
    
    @Bean
    public Job salesReportJob(Step readSalesFromAPI, 
                              Step aggregateSalesData, 
                              Step sendReportEmail) {
        return jobBuilderFactory.get("salesReportJob")
            .start(readSalesFromAPI)
            .next(aggregateSalesData)
            .next(sendReportEmail)
            .listener(new JobCompletionNotificationListener())
            .build();
    }
}

Жизненный цикл Job

1. STARTING → Job инициализируется
2. STARTED  → Job начинает выполняться
3. EXECUTING → Выполняются шаги (Steps)
4. STOPPING → Job получил сигнал остановки
5. STOPPED  → Job остановлен пользователем
6. COMPLETED → Job успешно завершен
7. FAILED   → Job завершился с ошибкой

Преимущества Spring Batch

  • Обработка больших объемов данных эффективно
  • Транзакционность и возможность rollback
  • Перезапуск неудачных Job
  • Параллелизм и распределение
  • Мониторинг и статистика
  • Простая интеграция с Spring экосистемой

Заключение

Job в Spring Batch — это основная единица для организации batch-обработки данных. Она позволяет структурировать сложные процессы обработки больших объемов информации с минимальными усилиями, обеспечивая надежность, транзакционность и мониторинг.

Что такое Job? | PrepBro