Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое Job
Job — это основная единица работы в фреймворке Spring Batch, предназначенная для обработки больших объемов данных в批处理 режиме (batch processing). Job определяет последовательность шагов (Steps), которые должны быть выполнены для обработки данных.
В контексте Spring Batch, Job — это конфигурация, описывающая как, когда и в каком порядке должны выполняться операции обработки данных.
Основная архитектура Spring Batch
Job
├── Step 1 (Read from file)
│ ├── ItemReader
│ ├── ItemProcessor
│ └── ItemWriter
│
├── Step 2 (Validation)
│ ├── ItemReader
│ ├── ItemProcessor
│ └── ItemWriter
│
└── Step 3 (Save to database)
├── ItemReader
├── ItemProcessor
└── ItemWriter
Компоненты Job
1. Job — основной контейнер
Определяет общую структуру и имя задачи
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Bean
public Job importUserJob(JobBuilderFactory jobBuilderFactory, Step step1) {
return jobBuilderFactory.get("importUserJob")
.start(step1)
.build();
}
}
2. Step — шаги в Job
Каждый Step представляет логическую единицу работы
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer) {
return stepBuilderFactory.get("step1")
.<User, User>chunk(10) // Обработка 10 записей за раз
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
Полный пример: Импорт пользователей из CSV
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
// 1. Модель данных
public class User {
private String firstName;
private String lastName;
private String email;
// Getters and Setters
}
// 2. ItemReader - чтение данных из CSV
@Component
public class UserItemReader {
@Bean
public FlatFileItemReader<User> reader() {
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("users.csv"));
reader.setLineMapper(new DefaultLineMapper<User>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames("firstName", "lastName", "email");
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
setTargetType(User.class);
}});
}});
return reader;
}
}
// 3. ItemProcessor - обработка данных
@Component
public class UserItemProcessor implements ItemProcessor<User, User> {
@Override
public User process(User user) throws Exception {
// Валидация и трансформация
if (user.getEmail() == null || user.getEmail().isEmpty()) {
throw new ValidationException("Email cannot be empty");
}
// Нормализация
user.setFirstName(user.getFirstName().toUpperCase());
user.setLastName(user.getLastName().toUpperCase());
return user;
}
}
// 4. ItemWriter - запись данных в БД
@Component
public class UserItemWriter implements ItemWriter<User> {
@Autowired
private UserRepository userRepository;
@Override
public void write(List<? extends User> users) throws Exception {
// Сохраняем пакет пользователей
userRepository.saveAll(users);
System.out.println("Saved " + users.size() + " users");
}
}
// 5. Конфигурация Job
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private UserItemReader userItemReader;
@Autowired
private UserItemProcessor processor;
@Autowired
private UserItemWriter writer;
// Определяем Step
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<User, User>chunk(100) // Обрабатываем 100 записей за раз
.reader(userItemReader.reader())
.processor(processor)
.writer(writer)
.build();
}
// Определяем Job
@Bean
public Job importUserJob(Step step1) {
return jobBuilderFactory.get("importUserJob")
.start(step1)
.build();
}
}
Запуск Job
// Вариант 1: Запуск через REST API
@RestController
@RequestMapping("/batch")
public class JobLauncherController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importUserJob;
@PostMapping("/import-users")
public ResponseEntity<String> importUsers() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(importUserJob, jobParameters);
return ResponseEntity.ok("Job started with ID: " + execution.getId());
}
}
// Вариант 2: Запуск по расписанию
@Component
public class ScheduledJobLauncher {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importUserJob;
// Запуск каждый день в полночь
@Scheduled(cron = "0 0 0 * * ?")
public void runImportUserJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("run.date", new Date())
.toJobParameters();
JobExecution execution = jobLauncher.run(importUserJob, jobParameters);
System.out.println("Job completed with status: " + execution.getStatus());
}
}
Продвинутые концепции
1. Множественные шаги (Multi-Step Job)
@Bean
public Job multiStepJob(Step step1, Step step2, Step step3) {
return jobBuilderFactory.get("multiStepJob")
.start(step1)
.next(step2)
.next(step3)
.build();
}
2. Условная логика (Flow)
@Bean
public Job conditionalStepJob(Step step1, Step step2, Step step3) {
return jobBuilderFactory.get("conditionalStepJob")
.start(step1)
.on("COMPLETED").to(step2) // Если Step1 успешен, идем на Step2
.on("FAILED").to(step3) // Если Step1 не успешен, идем на Step3
.end()
.build();
}
3. Параллельная обработка (Parallel Steps)
@Bean
public Job parallelStepJob(Step step1, Step step2) {
return jobBuilderFactory.get("parallelStepJob")
.start(parallelFlow())
.build();
}
@Bean
public Flow parallelFlow() {
return new FlowBuilder<Flow>("parallelFlow")
.split(new SimpleAsyncTaskExecutor())
.add(flow1(), flow2())
.build();
}
4. Обработка ошибок (Error Handling)
@Bean
public Step stepWithErrorHandling() {
return stepBuilderFactory.get("stepWithErrorHandling")
.<User, User>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant() // Включаем обработку ошибок
.skip(ValidationException.class) // Пропускаем эту ошибку
.skipLimit(10) // Максимум 10 пропусков
.retry(IOException.class) // Повторяем попытку при IOE
.retryLimit(3) // Максимум 3 повтора
.build();
}
5. Слушатели (Listeners)
// Слушатель Job
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
@Autowired
private UserRepository userRepository;
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
System.out.println("Job completed successfully");
System.out.println("Total users imported: " + userRepository.count());
}
}
}
// Слушатель Step
@Component
public class StepExecutionLogger implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("Step started: " + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("Step ended with " + stepExecution.getReadCount() + " reads");
return ExitStatus.COMPLETED;
}
}
Мониторинг Job
// Получение информации о выполненных Job
@Component
public class JobMonitor {
@Autowired
private JobRepository jobRepository;
public void printJobStatistics() {
List<JobInstance> instances = jobRepository.findJobInstancesByJobName("importUserJob", 0, 10);
for (JobInstance instance : instances) {
List<JobExecution> executions = jobRepository.findJobExecutions(instance);
for (JobExecution execution : executions) {
System.out.println("Job: " + execution.getJobInstance().getJobName());
System.out.println("Status: " + execution.getStatus());
System.out.println("Start Time: " + execution.getStartTime());
System.out.println("End Time: " + execution.getEndTime());
System.out.println("Read Count: " + execution.getStepExecutions().stream()
.mapToInt(StepExecution::getReadCount)
.sum());
}
}
}
}
Пример: Реальный сценарий
// Job: Ежедневный импорт отчетов о продажах
@Configuration
public class SalesReportBatchJob {
@Bean
public Step readSalesFromAPI() {
// Читаем данные из внешнего API
return stepBuilderFactory.get("readSalesFromAPI")
.<Sale, Sale>chunk(50)
.reader(new ApiItemReader(apiClient))
.processor(new SalesValidationProcessor())
.writer(new SalesWriter(saleRepository))
.build();
}
@Bean
public Step aggregateSalesData() {
// Агрегируем продажи по категориям
return stepBuilderFactory.get("aggregateSalesData")
.<Sale, SalesReport>chunk(100)
.reader(new SalesReader(saleRepository))
.processor(new SalesAggregationProcessor())
.writer(new SalesReportWriter(reportRepository))
.build();
}
@Bean
public Step sendReportEmail() {
// Отправляем отчет по email
return stepBuilderFactory.get("sendReportEmail")
.<SalesReport, SalesReport>chunk(10)
.reader(new ReportReader(reportRepository))
.processor(new EmailReportProcessor(emailService))
.writer(new EmailWriter())
.build();
}
@Bean
public Job salesReportJob(Step readSalesFromAPI,
Step aggregateSalesData,
Step sendReportEmail) {
return jobBuilderFactory.get("salesReportJob")
.start(readSalesFromAPI)
.next(aggregateSalesData)
.next(sendReportEmail)
.listener(new JobCompletionNotificationListener())
.build();
}
}
Жизненный цикл Job
1. STARTING → Job инициализируется
2. STARTED → Job начинает выполняться
3. EXECUTING → Выполняются шаги (Steps)
4. STOPPING → Job получил сигнал остановки
5. STOPPED → Job остановлен пользователем
6. COMPLETED → Job успешно завершен
7. FAILED → Job завершился с ошибкой
Преимущества Spring Batch
- Обработка больших объемов данных эффективно
- Транзакционность и возможность rollback
- Перезапуск неудачных Job
- Параллелизм и распределение
- Мониторинг и статистика
- Простая интеграция с Spring экосистемой
Заключение
Job в Spring Batch — это основная единица для организации batch-обработки данных. Она позволяет структурировать сложные процессы обработки больших объемов информации с минимальными усилиями, обеспечивая надежность, транзакционность и мониторинг.