DockerでSpring Batchを実行する

DockerでSpring Batchを実行する Java

Docker環境にて、Spring Batchを利用できるようにします。
以下の記事にて、Spring Boot環境を構築したので、同様に、Spring Batch環境を構築して、バッチ実行を行います。

上記の記事同様、Spring Initializrで作成したプロジェクトを、Docker上でビルド、起動します。
ビルドには、Gradle を使用します。

Spring Batchは、5.0(Spring Boot3.0)から、様々な変更が入り、実装方法が変更になりました。
2023年7月現在、検索エンジンには、Spring Batch4(Spring Boot2)以下での内容が多いので、ご注意ください。

環境

  • Java:17
  • Spring Boot:3.1.1
  • Spring Batch:5.0.2
  • MySQL:8.0

ディレクトリ構成

以下のディレクトリ構成としています。

.
├── docker-compose.yml
├── infra
│   ├── mysql
│   │   ├── Dockerfile
│   │   └── my.cnf
└── workspace (Spring Batchプロジェクトのディレクトリ)

Docker環境

下記の記事で構築した方法と同様です。

Spring Batchのプロジェクトを作成

上記の記事と同様に、Spring Initializrを利用しています。
バッチプロジェクトのため、依存関係は「Spring Web」と「Thymeleaf」を除き、「Spring Batch」を追加しました。

バッチ処理を実装

Tasklet(タスクレット)モデルでの実装

まず、Tasklet(タスクレット)モデルでの実装です。

Configの実装

まず、Spring Batchの設定に当たる、Configの実装です。

package com.example.demo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import com.example.demo.tasklet.SampleTasklet;

@Configuration
public class BatchConfig {

    @Bean
    public Job taskletJob(JobRepository jobRepository, Step sampleStep) {
        return new JobBuilder("taskletJob", jobRepository)
                    .start(sampleStep)
                    .build();
    }

    @Bean
    public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {

        return new StepBuilder("sampleStep", jobRepository)
                    .tasklet(new SampleTasklet(), transactionManager)
                    .allowStartIfComplete(true)
                    .build();
    }
}

JobとStepの定義を行います。
この辺りは、JobBuilderFactoryとStepBuilderFactory が非推奨になるなど、Spring Batch5になって変更になった部分です。
詳しくは以下の公式ドキュメントをご覧ください。

▼公式ドキュメント
https://spring.pleiades.io/spring-batch/docs/current/reference/html/whatsnew.html
▼マイグレーションガイド(英語)
https://github.com/spring-projects/spring-batch/wiki/Spring-Batch-5.0-Migration-Guide

Taskletの実装

続いて、Taskletの実装です。
「Hello, World!」を出力するだけのプログラムです。

package com.example.demo.tasklet;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;

@Component
@StepScope
public class SampleTasklet implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        System.out.println("Hello, World!");
        return RepeatStatus.FINISHED;
    }
}

ビルド&実行

これで、プログラムは完成なので、ビルド&実行します。

./gradlew bootrun

実行すると、コンソールの出力は以下のようになり、Taskletで実装した「Hello, World!」が出力されるのが確認できました。

INFO 6140 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=taskletJob]] launched with the following parameters: [{}]
INFO 6140 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [sampleStep]
Hello, World!
INFO 6140 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [sampleStep] executed in 21ms
6140 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=taskletJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 43ms

Chunk(チャンク)モデルでの実装

続いて、Chunk(チャンク)モデルでの実装です。

Configの実装

Configは、上記のTaskletモデルの設定に追記しました。

package com.example.demo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import com.example.demo.dto.UserDto;
import com.example.demo.entity.User;
import com.example.demo.listener.JobCompletionNotificationListener;
import com.example.demo.processor.UserItemProcessor;
import com.example.demo.repository.UserRepository;
import com.example.demo.tasklet.SampleTasklet;

@Configuration
public class BatchConfig {
    (中略)

    @Bean
    public Job importUserJob(
        JobRepository jobRepository,
        JobCompletionNotificationListener listener,
        Step importUserStep) {

        return new JobBuilder("importUserJob", jobRepository)
            .listener(listener)
            .start(importUserStep)
            .build();
    }

    @Bean
    public Step importUserStep(
        JobRepository jobRepository,
        PlatformTransactionManager transactionManager,
        ItemReader<UserDto> reader,
        UserItemProcessor processor,
        ItemWriter<User> writer) {

        return new StepBuilder("importUserStep", jobRepository)
            .<UserDto, User> chunk(10, transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .allowStartIfComplete(true)
            .build();
    }

    @Bean
    public FlatFileItemReader<UserDto> reader() {
        return new FlatFileItemReaderBuilder<UserDto>()
            .name("userItemReader")
            .resource(new ClassPathResource("sample-data.csv"))
            .delimited()
            .names(new String[]{"lastName", "firstName"})
            .fieldSetMapper(new BeanWrapperFieldSetMapper<UserDto>() {{
                setTargetType(UserDto.class);
            }})
            .build();
    }

    @Bean
    public ItemWriter<User> writer(UserRepository userRepository) {
        RepositoryItemWriter<User> repositoryItemWriter = new RepositoryItemWriter<>();
        repositoryItemWriter.setRepository(userRepository);
        repositoryItemWriter.setMethodName("save");

        return repositoryItemWriter;
    }
}

JobとStepの定義を行います。
Chunkモデルのため、ReaderやWriterも定義しています。
Processorの定義については後述します。

Readerの実装

続いて、Readerの実装です。
本記事では、「src/main/resources」に「sample-data.csv」を置いて、それを読み込んでみます。

山田,太郎
鈴木,一郎
田中,二郎
後藤,三郎
三浦,四郎
山本,

名字と名前が並んでいるCSVとしました。
最終行は名字のみ、の例です。

Readerの実装といっても、上記でConfigで定義した内容で、Readerの実装は不要となっています。
下記で読み込むCSVの名前と内容を定義して、読み込んでいます。

public FlatFileItemReader<UserDto> reader() {
    return new FlatFileItemReaderBuilder<UserDto>()
        .name("userItemReader")
        .resource(new ClassPathResource("sample-data.csv"))
        .delimited()
        .names(new String[]{"lastName", "firstName"})
        .fieldSetMapper(new BeanWrapperFieldSetMapper<UserDto>() {{
            setTargetType(UserDto.class);
        }})
        .build();
}

読み込んだCSVの内容を保存するdtoを以下のように用意しました。
lombokを使っているので、プロパティの定義のみです。

package com.example.demo.dto;

import lombok.Data;

@Data
public class UserDto {
    private String lastName;

    private String firstName;
}

Processorの実装

続いて、Processorの実装です。
読み込んだCSVの内容から、名字と名前が両方設定されていれば、有効(validをtrue)とするようにしています。

package com.example.demo.processor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import com.example.demo.dto.UserDto;
import com.example.demo.entity.User;

@Component
public class UserItemProcessor implements ItemProcessor<UserDto, User> {

    private static final Logger log = LoggerFactory.getLogger(UserItemProcessor.class);

    @Override
    public User process(UserDto userDto) throws Exception {
        User user = new User();
        BeanUtils.copyProperties(userDto, user);

        if (!ObjectUtils.isEmpty(userDto.getLastName()) && !ObjectUtils.isEmpty(userDto.getFirstName())) {
            user.setValid(true);
        }

        log.info(user + "");
        return user;
    }
}

Writerの実装

続いて、Writerの実装です。
WriterもReaderと同様、Configで定義した内容で実装は不要となっています。
リポジトリを指定して、DBに保存しています。

@Bean
public ItemWriter<User> writer(UserRepository userRepository) {
    RepositoryItemWriter<User> repositoryItemWriter = new RepositoryItemWriter<>();
    repositoryItemWriter.setRepository(userRepository);
    repositoryItemWriter.setMethodName("save");

    return repositoryItemWriter;
}

以下がリポジトリとモデルの実装です。

package com.example.demo.entity;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.Data;

@Entity
@Data
@Table(name = "users")
public class User {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private int id;

    @Column(nullable = false)
    private String lastName;

    @Column(nullable = false)
    private String firstName;

    private boolean valid = false;
}
package com.example.demo.repository;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import com.example.demo.entity.User;

@Repository
public interface UserRepository extends JpaRepository<User, Integer> {}

Listenerの実装

最後に、Listenerの実装です。
直接関係ありませんが、例として記載します。

Jobの設定で、Listenerを指定しています。

ListenerではJobの終了後に、DBを検索して、内容をログに出力する処理を行っています。

package com.example.demo.listener;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;

import com.example.demo.entity.User;
import com.example.demo.repository.UserRepository;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class JobCompletionNotificationListener implements JobExecutionListener {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    @NonNull
    private final UserRepository userRepository;

    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time to verify the results");

            List<User> userList = userRepository.findAll();
            userList.forEach(user -> log.info(user + ""));
        }
    }
}

ビルド&実行

これで、プログラムは完成なので、ビルド&実行します。
複数のJobがある時にJobを指定しないで実行すると、「Job name must be specified in case of multiple jobs」というエラーになるので、指定して実行します。

./gradlew bootrun --args='--spring.batch.job.name=importUserJob'

実行すると、コンソールの出力は以下のようになります。
まず、Processorで加工した結果をログに出力しているのと、ListenerでDB検索した結果を出力しているのが確認できます。

INFO 6705 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{}]
INFO 6705 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [importUserStep]
INFO 6705 --- [  restartedMain] c.e.demo.processor.UserItemProcessor     : User(id=0, lastName=山田, firstName=太郎, valid=true)
INFO 6705 --- [  restartedMain] c.e.demo.processor.UserItemProcessor     : User(id=0, lastName=鈴木, firstName=一郎, valid=true)
INFO 6705 --- [  restartedMain] c.e.demo.processor.UserItemProcessor     : User(id=0, lastName=田中, firstName=二郎, valid=true)
INFO 6705 --- [  restartedMain] c.e.demo.processor.UserItemProcessor     : User(id=0, lastName=後藤, firstName=三郎, valid=true)
INFO 6705 --- [  restartedMain] c.e.demo.processor.UserItemProcessor     : User(id=0, lastName=三浦, firstName=四郎, valid=true)
INFO 6705 --- [  restartedMain] c.e.demo.processor.UserItemProcessor     : User(id=0, lastName=山本, firstName=, valid=false)
INFO 6705 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [importUserStep] executed in 134ms
INFO 6705 --- [  restartedMain] .e.d.l.JobCompletionNotificationListener : !!! JOB FINISHED! Time to verify the results
INFO 6705 --- [  restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=1, lastName=山田, firstName=太郎, valid=true)
INFO 6705 --- [  restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=2, lastName=鈴木, firstName=一郎, valid=true)
INFO 6705 --- [  restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=3, lastName=田中, firstName=二郎, valid=true)
INFO 6705 --- [  restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=4, lastName=後藤, firstName=三郎, valid=true)
INFO 6705 --- [  restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=5, lastName=三浦, firstName=四郎, valid=true)
INFO 6705 --- [  restartedMain] .e.d.l.JobCompletionNotificationListener : User(id=6, lastName=山本, firstName=, valid=false)
INFO 6705 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 157ms

DBを直接確認しても、下記のようにデータが登録されているのが確認できました。

タイトルとURLをコピーしました