В этой статье основное внимание уделяется объяснению принципов! Если вы хотите увидеть размер партии Обработка данных Практический бой, пожалуйста.сосредоточиться ниже (пожалуйста, добавьте больше позже) на):
Spring Batch — это платформа пакетной обработки с открытым исходным кодом на основе Java, предназначенная для обработки крупномасштабных, повторяющихся и высоконадежных задач. Он обеспечивает простой и мощный способ выполнения заданий пакетной обработки, таких как импорт/экспорт данных, создание отчетов, пакетная обработка и т. д.
Что такое весенняя партия?
Spring Batch предназначен для упрощения разработки и управления пакетными заданиями. Он предоставляет расширяемую модель для определения и выполнения пакетных заданий, разделяя задание на несколько шагов (шагов), каждый из которых состоит из одного или нескольких блоков задач (чанков). Используя Spring Batch, можно легко обрабатывать большие объемы данных и сложную бизнес-логику.
Особенности и преимущества Spring Batch
1. Установите и настройте Spring Batch.
Сначала убедитесь, что ваша среда разработки Java установлена и настроена. Затем вы можете использовать инструмент сборки, такой как Maven или Gradle, чтобы добавить зависимости Spring Batch в ваш проект. Подробную информацию об установке и настройке см. в официальной документации Spring Batch.
2. Создайте первое пакетное задание.
В Spring Batch пакетное задание состоит из одного или нескольких шагов, а каждый шаг состоит из одного или нескольких блоков задач. Вот простой пример, показывающий, как создать простое пакетное задание:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
System.out.println("Hello, Spring Batch!");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Job job(Step step1) {
return jobBuilderFactory.get("job")
.start(step1)
.build();
}
}
Первое использование@Configuration
и@EnableBatchProcessing
Аннотации помечают классы какSpring Класс пакетной конфигурации. Затем,использоватьJobBuilderFactory
иStepBuilderFactory
Создать работуистроитель шагов。существоватьstep1
в методе,Определяет простой блок задач,напечатать «Здравствуйте! Spring Batch!"и вернутьсяRepeatStatus.FINISHED
。наконец,существоватьjob
в методе,использоватьjobBuilderFactory
Создать работу,и будетstep1
В качестве стартового шага к работе。
3. Понимание блоков заданий, шагов и задач.
Чтение и запись данных:Spring BatchПредоставляет разнообразныечитатьиписатьданныеспособ。МожетиспользоватьItemReader
читатьданные,Например изданные Библиотека、очередь файлов или сообщенийсерединачитатьданные。ЗатемиспользоватьItemWriter
будет обработаноданные Написать цель,нравитьсяданные Библиотекаповерхность、очередь файлов или сообщений。
Во-первых, нам нужно определить модель данных для представления информации об учащихся, например
public class Student {
private String name;
private int score;
// Getters and setters
// ...
}
Далее мы можем использовать Spring BatchпредоставилFlatFileItemReader
ПриходитьчитатьCSVв файледанные:
@Bean
public FlatFileItemReader<Student> studentItemReader() {
FlatFileItemReader<Student> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("students.csv"));
reader.setLineMapper(new DefaultLineMapper<Student>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "name", "score" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {
{
setTargetType(Student.class);
}
});
}
});
return reader;
}
Поддерживаемые форматы данных и источники данных
Преобразование и проверка данных
Spring Пакетные предложения и проверка данныхмеханизм。МожетиспользоватьItemProcessor
верночитатьизданные Сделать преобразование、фильтрипроверять。ItemProcessor
Пользовательскую бизнес-логику можно применять для обработки каждогоданныеэлемент。
Мы настроилиFlatFileItemReader
,Установите расположение и сопоставление строк для файлов CSV.,Указывает разделители полей и сопоставления атрибутов поля и модели.
Следующий,Мы можем определитьItemProcessor
Приходитьверночитатьиз学生信息Преобразуйте и проверьте:
@Bean
public ItemProcessor<Student, Student> studentItemProcessor() {
return new ItemProcessor<Student, Student>() {
@Override
public Student process(Student student) throws Exception {
// Преобразуйте и проверьте
if (student.getScore() < 0) {
// Если проверка не удалась, выдается исключение.
throw new IllegalArgumentException("Invalid score for student: " + student.getName());
}
// Операции преобразования, такие как преобразование дробей в проценты.
int percentage = student.getScore() * 10;
student.setScore(percentage);
return student;
}
};
}
В приведенном выше коде,Мы определилиItemProcessor
,Преобразование контрольной суммы информации об учениках. Если балл ученика меньше 0,в противном случае выдает исключение;,Преобразуйте дроби в проценты.
Наконец, мы можем использовать Spring BatchпредоставилJdbcBatchItemWriter
будет обработано学生信息писатьданные Библиотека:
@Bean
public JdbcBatchItemWriter<Student> studentItemWriter(DataSource dataSource) {
JdbcBatchItemWriter<Student> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO students (name, score) VALUES (:name, :score)");
writer.setDataSource(dataSource);
return writer;
}
Планирование и контроль работы
Конфигурация планировщика заданий:Spring Пакетная служба предоставляет планировщик заданий для настройки и управления выполнением пакетных заданий. Задания можно планировать с помощью среды планирования Spring (например, Quartz) или инструмента планирования операционной системы (например, cron). Настраивая планировщик заданий, вы можете установить время запуска задания, частоту и другие параметры планирования.
В приведенном выше коде,Мы настроилиJdbcBatchItemWriter
,Настройка операторов SQL и источников данных,Пакетно вставляйте обработанную информацию о студентах в таблицу базы данных данных.
Наконец, нам нужно настроить шаг задания для сборки процесса чтения, обработки и записи данных:
@Bean
public Step processStudentStep(ItemReader<Student> reader, ItemProcessor<Student, Student> processor, ItemWriter<Student> writer) {
return stepBuilderFactory.get("processStudentStep")
.<Student, Student>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
В приведенном выше коде,насиспользоватьstepBuilderFactory
создал шаг,И указаны считыватели, процессоры и записи данных.
Мониторинг и управление выполнением работ:Spring Пакетная обработка обеспечивает богатые функции мониторинга и управления. Можно использовать пружину Интерфейс пакетного управления и API для мониторинга состояния выполнения заданий, показателей прогресса и производительности. Вы также можете использовать механизмы ведения журнала, уведомлений и сигналов тревоги для своевременного получения информации о состоянии выполнения задания и исключениях.
Наконец, мы можем настроить задание для планирования этого шага:
@Bean
public Job processStudentJob(JobBuilderFactory jobBuilderFactory, Step processStudentStep) {
return jobBuilderFactory.get("processStudentJob")
.flow(processStudentStep)
.end()
.build();
}
насиспользоватьjobBuilderFactory
Создал работу,и указанные шаги для выполнения.
В приведенном выше примере мы продемонстрировали Spring Batchсередина Чтение и запись данныхспособ,использовать ПонятноFlatFileItemReader
читатьCSVдокумент,использовать ПонятноJdbcBatchItemWriter
будет обработано学生信息писатьданные Библиотека。в то же время,насиспользовать ПонятноItemProcessor
верночитатьиз学生信息Преобразуйте и пожалуйста. Этот пример также показывает, как Spring Поддержка пакетной обработки различных источников данных и форматов данных, а также способы настройки и объединения этапов задания для выполнения всей задачи пакетной обработки.
Обработка ошибок и механизм повтора
Spring Пакетные предложения Обработка ошибок и механизм повтора для обеспечения стабильности и надежности пакетных заданий. Политики можно настроить для обработки ошибок и исключений во время чтения, обработки и записи. Количество повторных попыток, интервалы повторных попыток и стратегии обработки ошибок можно настроить для адаптации к различным сценариям возникновения ошибок и потребностям.
первый,Мы можем установить стратегию обработки ошибок в конфигурации шага. Например,нас МожетиспользоватьSkipPolicy
пропустить определенные исключения,илииспользоватьRetryPolicy
попробовать еще раз。
@Bean
public Step processStudentStep(ItemReader<Student> reader, ItemProcessor<Student, Student> processor, ItemWriter<Student> writer) {
return stepBuilderFactory.get("processStudentStep")
.<Student, Student>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.retry(Exception.class)
.retryLimit(3)
.build();
}
насиспользоватьfaultTolerant()
Способ включения политики обработки ошибок。Затем,использоватьskip(Exception.class)
Укажите, чтобы пропустить определенные исключения,использоватьskipLimit(10)
设置跳过из最大Второсортный Число10Второсортный。в то же время,использоватьretry(Exception.class)
Укажите повторную попытку определенных исключений,использоватьretryLimit(3)
Установите максимальное количество повторов3Второсортный。
По умолчанию, если во время чтения, обработки или записи возникает исключение, Spring Batch помечает элемент как элемент ошибки и пытается пропустить или повторить попытку, пока не будет достигнуто максимальное количество пропусков или повторов.
также,Вы также можете настроить обработчики ошибок для каждого шага.,Настройте логику обработки элементов ошибок. Например,МожетиспользоватьSkipListener
Приходить处理跳过изэлемент,использоватьRetryListener
Приходить处理重试изэлемент。
@Bean
public SkipListener<Student, Student> studentSkipListener() {
return new SkipListener<Student, Student>() {
@Override
public void onSkipInRead(Throwable throwable) {
// Обработка исключений, возникающих во время чтения
}
@Override
public void onSkipInWrite(Student student, Throwable throwable) {
// Обработка исключений, возникающих во время записи
}
@Override
public void onSkipInProcess(Student student, Throwable throwable) {
// Обработка исключений, возникающих во время обработки.
}
};
}
@Bean
public RetryListener studentRetryListener() {
return new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
// Логика выполнения перед повторной попыткой
return true;
}
@Override
public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// Обработка исключений, возникающих во время повторных попыток
}
@Override
public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// Логика, которая будет выполнена после повторной попытки
}
};
}
@Bean
public Step processStudentStep(ItemReader<Student> reader, ItemProcessor<Student, Student> processor, ItemWriter<Student> writer,
SkipListener<Student, Student> skipListener, RetryListener retryListener) {
return stepBuilderFactory.get("processStudentStep")
.<Student, Student>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.retry(Exception.class)
.retryLimit(3)
.listener(skipListener)
.listener(retryListener)
.build();
}
Рекомендации по пакетной обработке
Spring Пакетная обработка предоставляет множество точек расширения через Пользовательские. считыватели, писатели и процессоры и другие компоненты для расширения и настройки функциональности пакетных заданий.
public class MyItemReader implements ItemReader<String> {
private List<String> data = Arrays.asList("item1", "item2", "item3");
private Iterator<String> iterator = data.iterator();
@Override
public String read() throws Exception {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}
}
Пользовательский писатель:
public class MyItemWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> items) throws Exception {
for (String item : items) {
// Пользовательская логика записи
}
}
}
Пользовательский процессор:
public class MyItemProcessor implements ItemProcessor<String, String> {
@Override
public String process(String item) throws Exception {
// Пользовательская логика обработки
return item.toUpperCase();
}
}
Параллельная обработка пакетных заданий:
Spring Batch поддерживает разделение заданий пакетной обработки на несколько независимых шагов и достижение параллельной обработки посредством многопоточности или распределенной обработки.
Многопоточная обработка. Многопоточная обработка может быть достигнута путем настройки TaskExecutor. Используя TaskExecutor, каждый шаг может выполняться в отдельном потоке, что обеспечивает параллельную обработку.
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
@Bean
public Step myStep(ItemReader<String> reader, ItemProcessor<String, String> processor, ItemWriter<String> writer) {
return stepBuilderFactory.get("myStep")
.<String, String>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(taskExecutor())
.build();
}
В приведенном выше коде,мы проходимtaskExecutor()
Метод определяет исполнителя задачи пула потоков.,и будетнастроить его на шагсерединаизtaskExecutor()
в методе。
Распределенная обработка. Если требуется более высокий параллелизм и масштабируемость, рассмотрите возможность использования распределенной обработки. Spring Batch обеспечивает интеграцию с такими проектами, как Spring Integration и Spring Cloud Task, для обеспечения распределенного развертывания и обработки.
Сначала вам необходимо настроить канал сообщений и адаптер Spring Integration в пакетном задании Spring. Вы можете использовать каналы сообщений для отправки и получения входных и выходных данных задания, а также адаптеры для взаимодействия с внешними системами.
@Configuration
@EnableBatchProcessing
@EnableIntegration
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyItemReader reader;
@Autowired
private MyItemProcessor processor;
@Autowired
private MyItemWriter writer;
@Bean
public IntegrationFlow myJobFlow() {
return IntegrationFlows.from("jobInputChannel")
.handle(jobLaunchingGateway())
.get();
}
@Bean
public MessageChannel jobInputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel jobOutputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel stepInputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel stepOutputChannel() {
return new DirectChannel();
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return new JobLaunchingGateway(jobLauncher);
}
@Bean
public JobRepository jobRepository() {
// Настроить хранилище заданий
}
@Bean
public Job myJob() {
return jobBuilderFactory.get("myJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.inputChannel(stepInputChannel())
.outputChannel(stepOutputChannel())
.build();
}
}
В приведенном выше коде мы настраиваем Spring Batch作业из消息通道иадаптер。myJobFlow()
метод определяет процесс интеграции,Он назван отjobInputChannel
Канал сообщений получает запросы на работу,и пройтиjobLaunchingGateway()
Способ начать работу。jobLaunchingGateway()
метод создаетJobLaunchingGateway
Пример,Используется для запуска задания.
Во-первых, вам необходимо настроить средство запуска задач и прослушиватель задач Spring Cloud Task в пакетном задании Spring. Средство запуска задач используется для запуска распределенных задач и управления ими, а прослушиватель задач — для выполнения некоторых операций во время выполнения задач.
@Configuration
@EnableBatchProcessing
@EnableTask
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private MyItemReader reader;
@Autowired
private MyItemProcessor processor;
@Autowired
private MyItemWriter writer;
@Bean
public TaskConfigurer taskConfigurer() {
return new DefaultTaskConfigurer();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
@Bean
public Job myJob() {
return jobBuilderFactory.get("myJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskListener myTaskListener() {
return new MyTaskListener();
}
@Bean
public TaskExecutionListener myTaskExecutionListener() {
return new MyTaskExecutionListener();
}
}