Docker環境にて、Spring Batchを利用できるようにします。
以下の記事にて、Spring Boot環境を構築したので、同様に、Spring Batch環境を構築して、バッチ実行を行います。
上記の記事同様、Spring Initializrで作成したプロジェクトを、Docker上でビルド、起動します。
ビルドには、Gradle を使用します。
Spring Batchは、5.0(Spring Boot3.0)から、様々な変更が入り、実装方法が変更になりました。
2023年7月現在、検索エンジンには、Spring Batch4(Spring Boot2)以下での内容が多いので、ご注意ください。
環境
- Java:17
- Spring Boot:3.1.1
- Spring Batch:5.0.2
- MySQL:8.0
ディレクトリ構成
以下のディレクトリ構成としています。
.
├── docker-compose.yml
├── infra
│ ├── mysql
│ │ ├── Dockerfile
│ │ └── my.cnf
└── workspace (Spring Batchプロジェクトのディレクトリ)
Docker環境
下記の記事で構築した方法と同様です。
Spring Batchのプロジェクトを作成
上記の記事と同様に、Spring Initializrを利用しています。
バッチプロジェクトのため、依存関係は「Spring Web」と「Thymeleaf」を除き、「Spring Batch」を追加しました。
バッチ処理を実装
Tasklet(タスクレット)モデルでの実装
まず、Tasklet(タスクレット)モデルでの実装です。
Configの実装
まず、Spring Batchの設定に当たる、Configの実装です。
package com.example.demo.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import com.example.demo.tasklet.SampleTasklet;
@Configuration
public class BatchConfig {
@Bean
public Job taskletJob(JobRepository jobRepository, Step sampleStep) {
return new JobBuilder("taskletJob", jobRepository)
.start(sampleStep)
.build();
}
@Bean
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.tasklet(new SampleTasklet(), transactionManager)
.allowStartIfComplete(true)
.build();
}
}
JobとStepの定義を行います。
この辺りは、JobBuilderFactoryとStepBuilderFactory が非推奨になるなど、Spring Batch5になって変更になった部分です。
詳しくは以下の公式ドキュメントをご覧ください。
▼公式ドキュメント
https://spring.pleiades.io/spring-batch/docs/current/reference/html/whatsnew.html
▼マイグレーションガイド(英語)
https://github.com/spring-projects/spring-batch/wiki/Spring-Batch-5.0-Migration-Guide
Taskletの実装
続いて、Taskletの実装です。
「Hello, World!」を出力するだけのプログラムです。
package com.example.demo.tasklet;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;
@Component
@StepScope
public class SampleTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello, World!");
return RepeatStatus.FINISHED;
}
}
ビルド&実行
これで、プログラムは完成なので、ビルド&実行します。
./gradlew bootrun
実行すると、コンソールの出力は以下のようになり、Taskletで実装した「Hello, World!」が出力されるのが確認できました。
INFO 6140 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=taskletJob]] launched with the following parameters: [{}]
INFO 6140 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [sampleStep]
Hello, World!
INFO 6140 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [sampleStep] executed in 21ms
6140 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=taskletJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 43ms
Chunk(チャンク)モデルでの実装
続いて、Chunk(チャンク)モデルでの実装です。
Configの実装
Configは、上記のTaskletモデルの設定に追記しました。
package com.example.demo.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import com.example.demo.dto.UserDto;
import com.example.demo.entity.User;
import com.example.demo.listener.JobCompletionNotificationListener;
import com.example.demo.processor.UserItemProcessor;
import com.example.demo.repository.UserRepository;
import com.example.demo.tasklet.SampleTasklet;
@Configuration
public class BatchConfig {
(中略)
@Bean
public Job importUserJob(
JobRepository jobRepository,
JobCompletionNotificationListener listener,
Step importUserStep) {
return new JobBuilder("importUserJob", jobRepository)
.listener(listener)
.start(importUserStep)
.build();
}
@Bean
public Step importUserStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<UserDto> reader,
UserItemProcessor processor,
ItemWriter<User> writer) {
return new StepBuilder("importUserStep", jobRepository)
.<UserDto, User> chunk(10, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.allowStartIfComplete(true)
.build();
}
@Bean
public FlatFileItemReader<UserDto> reader() {
return new FlatFileItemReaderBuilder<UserDto>()
.name("userItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"lastName", "firstName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<UserDto>() {{
setTargetType(UserDto.class);
}})
.build();
}
@Bean
public ItemWriter<User> writer(UserRepository userRepository) {
RepositoryItemWriter<User> repositoryItemWriter = new RepositoryItemWriter<>();
repositoryItemWriter.setRepository(userRepository);
repositoryItemWriter.setMethodName("save");
return repositoryItemWriter;
}
}
JobとStepの定義を行います。
Chunkモデルのため、ReaderやWriterも定義しています。
Processorの定義については後述します。
Readerの実装
続いて、Readerの実装です。
本記事では、「src/main/resources」に「sample-data.csv」を置いて、それを読み込んでみます。
山田,太郎
鈴木,一郎
田中,二郎
後藤,三郎
三浦,四郎
山本,
名字と名前が並んでいるCSVとしました。
最終行は名字のみ、の例です。
Readerの実装といっても、上記でConfigで定義した内容で、Readerの実装は不要となっています。
下記で読み込むCSVの名前と内容を定義して、読み込んでいます。
public FlatFileItemReader<UserDto> reader() {
return new FlatFileItemReaderBuilder<UserDto>()
.name("userItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"lastName", "firstName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<UserDto>() {{
setTargetType(UserDto.class);
}})
.build();
}
読み込んだCSVの内容を保存するdtoを以下のように用意しました。
lombokを使っているので、プロパティの定義のみです。
package com.example.demo.dto;
import lombok.Data;
@Data
public class UserDto {
private String lastName;
private String firstName;
}
Processorの実装
続いて、Processorの実装です。
読み込んだCSVの内容から、名字と名前が両方設定されていれば、有効(validをtrue)とするようにしています。
package com.example.demo.processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import com.example.demo.dto.UserDto;
import com.example.demo.entity.User;
@Component
public class UserItemProcessor implements ItemProcessor<UserDto, User> {
private static final Logger log = LoggerFactory.getLogger(UserItemProcessor.class);
@Override
public User process(UserDto userDto) throws Exception {
User user = new User();
BeanUtils.copyProperties(userDto, user);
if (!ObjectUtils.isEmpty(userDto.getLastName()) && !ObjectUtils.isEmpty(userDto.getFirstName())) {
user.setValid(true);
}
log.info(user + "");
return user;
}
}
Writerの実装
続いて、Writerの実装です。
WriterもReaderと同様、Configで定義した内容で実装は不要となっています。
リポジトリを指定して、DBに保存しています。
@Bean
public ItemWriter<User> writer(UserRepository userRepository) {
RepositoryItemWriter<User> repositoryItemWriter = new RepositoryItemWriter<>();
repositoryItemWriter.setRepository(userRepository);
repositoryItemWriter.setMethodName("save");
return repositoryItemWriter;
}
以下がリポジトリとモデルの実装です。
package com.example.demo.entity;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.Data;
@Entity
@Data
@Table(name = "users")
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
@Column(nullable = false)
private String lastName;
@Column(nullable = false)
private String firstName;
private boolean valid = false;
}
package com.example.demo.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import com.example.demo.entity.User;
@Repository
public interface UserRepository extends JpaRepository<User, Integer> {}
Listenerの実装
最後に、Listenerの実装です。
直接関係ありませんが、例として記載します。
Jobの設定で、Listenerを指定しています。
ListenerではJobの終了後に、DBを検索して、内容をログに出力する処理を行っています。
package com.example.demo.listener;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
import com.example.demo.entity.User;
import com.example.demo.repository.UserRepository;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
@Component
@RequiredArgsConstructor
public class JobCompletionNotificationListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
@NonNull
private final UserRepository userRepository;
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
List<User> userList = userRepository.findAll();
userList.forEach(user -> log.info(user + ""));
}
}
}
ビルド&実行
これで、プログラムは完成なので、ビルド&実行します。
複数のJobがある時にJobを指定しないで実行すると、「Job name must be specified in case of multiple jobs」というエラーになるので、指定して実行します。
./gradlew bootrun --args='--spring.batch.job.name=importUserJob'
実行すると、コンソールの出力は以下のようになります。
まず、Processorで加工した結果をログに出力しているのと、ListenerでDB検索した結果を出力しているのが確認できます。
INFO 6705 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{}]
INFO 6705 --- [ restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [importUserStep]
INFO 6705 --- [ restartedMain] c.e.demo.processor.UserItemProcessor : User(id=0, lastName=山田, firstName=太郎, valid=true)
INFO 6705 --- [ restartedMain] c.e.demo.processor.UserItemProcessor : User(id=0, lastName=鈴木, firstName=一郎, valid=true)
INFO 6705 --- [ restartedMain] c.e.demo.processor.UserItemProcessor : User(id=0, lastName=田中, firstName=二郎, valid=true)
INFO 6705 --- [ restartedMain] c.e.demo.processor.UserItemProcessor : User(id=0, lastName=後藤, firstName=三郎, valid=true)
INFO 6705 --- [ restartedMain] c.e.demo.processor.UserItemProcessor : User(id=0, lastName=三浦, firstName=四郎, valid=true)
INFO 6705 --- [ restartedMain] c.e.demo.processor.UserItemProcessor : User(id=0, lastName=山本, firstName=, valid=false)
INFO 6705 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [importUserStep] executed in 134ms
INFO 6705 --- [ restartedMain] .e.d.l.JobCompletionNotificationListener : !!! JOB FINISHED! Time to verify the results
INFO 6705 --- [ restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=1, lastName=山田, firstName=太郎, valid=true)
INFO 6705 --- [ restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=2, lastName=鈴木, firstName=一郎, valid=true)
INFO 6705 --- [ restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=3, lastName=田中, firstName=二郎, valid=true)
INFO 6705 --- [ restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=4, lastName=後藤, firstName=三郎, valid=true)
INFO 6705 --- [ restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=5, lastName=三浦, firstName=四郎, valid=true)
INFO 6705 --- [ restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=6, lastName=山本, firstName=, valid=false)
INFO 6705 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 157ms
DBを直接確認しても、下記のようにデータが登録されているのが確認できました。