Spring boot/spring-batch

Spring boot - Spring Batch란?

pooney 2022. 6. 20. 22:16

안녕하세요 오늘은 많이 사용하는 Spring Batch를 설명해 드릴려고 합니다. Batch를 사용하기 위해선 스케줄러를 같이 사용하는데 대표적으로 아래와 같습니다 

 

 

 

  1. 쉽게 어노테이션으로 사용가능 한 @Scheduled
  2. DB 클러스터링을 도와주는 Quartz 
  3. CI/CD 젠킨스 

 

 

Batch + Scheduler 는 나중에 설명해 드리도록 하고 우선 들어가기전 Batch 와 스케줄러에 대해서 혼란 스러운 분들을 위하여 간단하게 설명하겠습니다. 

 

Batch란?

우리는 흔히 일을 진행하면서  아래와 같은 작업이 필요한경우 가 발생합니다. 

 

 

  • 많은 양의 데이터를 처리한 결과값을 저장하거나 이러한 결과를 사용자에게 보여줘야하는경우 
  • 배송중인 상태로 5일이 지나면 배송완료 상태로 변경 
  • 구매완료 7일 이후 자동 구매확정 

 

 

서비스를 운영하다 보시면 위와 같은 수많은 케이스가 발생 할 것입니다.  이러한 케이스를 실시간으로 반영 할 수 있을까요? 물론 가능합니만 너무 리소스 낭비적이고  실시간으로 수만개의 레코드를 Processing 하는 것은 자칫하다간 장애를 일으 킬수 있는 문제를 가집니다.  때문에 이러한 것들은 사용자의 요청이 많이 발생하지 않는 새벽이라던가 특정 시간때에 작업을 하면 더 효율적으로 서비스를 운영 할 수 있을 것입니다. 그러기 위해서 필요한 것이 바로 Batch 입니다. 

 

 

 

 

즉  Batch는 대용량의 데이터를 처리한다.

 

 

 

 

이러한 Batch에 대표적인게 Spring Batch입니다. Spring Batch는 로깅, 트랜잭션, 청크 , 실패에 따른 재시작,  특정 .Job에 뒤에 Job을 싱행, Job의 성공여부등 수많은 Batch에 필요한 기능들을 제공합니다. 

 

 

 

 

스케줄러란?

 

스케줄러의 사전적 의미 "시간에 따라 구체적으로 세운 계획" 입니다.

즉 시간에 따라 특정 JOB을 실행 하도록 도와주는 것입니다.  때문에 스케줄러는 시간에 따른 특정 작업을 실행하고 관리하는데 특화 되어 있지 특정 Job의 성공여부, 실패에 따른 재시작등을 관리하는 Batch와는 완전히 다른 개념입니다. 

 

 

 

 

 

 

 

 

 

 

Spring batch 

 

 

이번화에는 아래와 같은 구조인 Batch를 아주 간단한 구조로 만들어 보도록 하겠습니다.  후에 스케줄러등을 같이 결합하여 만드는 것을 소개 해 드리겠습니다. 

 

 

 

 

 

 

 

 

 

- JOB 

처리 하고자 하는 작업의 단위 입니다. 

 

- JobInstance 

JOB이 실행되면 JobInstance가 생성됩니다. 이러한 JobInstance를 구별은 JobParameters에 의해 이루어집니다. 

예를 들어 구매확정 JOB이 있다고 하면 이러한 JobParameters로 2022-06-22를 넘기면 2022-06-22에 실행된 구매확정 JobInstance가 생성되어 지는 것입니다. 말그대로 Instance이기 때문에 동일 한 JobParameters로 실행 시켰고 이러한 JOB이 성공했다면 이미 실행된 Job이라는 Error 메시지를 보실 수 있습니다.  만약 실패 했다면 다시 실행 가능합니다.  JOB 실행 도중 장애가 발생했다면 수정 후에 다시 실행시키면 원하는 결과를 보실 수 있을 것입니다

 

- JobParameters

Sprinb Batch는 동일한 Parameter의 Job을 실행 시킬 수 없습니다. JOB과 JobParameter로 JobInstance를 생성하기 때문입니다.  물론 실패 하였다면 다시 실행 가능합니다. 이러한 JobParameter는 String, Double, Long, Date가 존재합니다. 

 

-JobExecution

JobInstance에 대한 실행 정보를 나타 냅니다.  [구매확정- 2022-06-22- JobInstance] 실행하다가 실패하여 다시 동일한  [구매확정- 2022-06-22- JobInstance]를 실행하여 성공 했다면  [구매확정- 2022-06-22- JobInstance-Jobexecution-1], [구매확정- 2022-06-22- JobInstance-Jobexecution-2] 가 생성됩니다. 

 

 

 

 

 

 

 

 

 

그러면 이제 Spring Batch에 대해서 시작하도록 하겠습니다. 우선  dependency를 추가하도록 하겠습니다. 저는 JPA를 통한 Batch를 구성하기 위해 JPA를 추가하였습니다. 사용하시는 환경에 따라  dependency를 추가 하시면 됩니다. 

 

 

 

 

 

 

 

build.gradle

implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-batch'
compileOnly 'org.projectlombok:lombok'

 

 

application.yml

spring:
  batch:
    job:
      enabled: false // true : 어플리케이션 실행 시 등록된 Job을 싱행 / false : 비실행
      names: ${job.name:NONE} // 특정 JOB만 실행시킬경우 DEFAULT는 NONE
    jdbc:
      initialize-schema: always
  
  jpa:
    hibernate:
      ddl-auto: none
      naming:
        physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
      use-new-id-generator-mappings: false
    show-sql: true
    properties:
      hibernate.format_sql: true

  datasource:
    url: jdbc:mysql://localhost:3306/batch?serverTimezone=UTC&characterEncoding=UTF-8
    username: root
    password: 1234
    driver-class-name: com.mysql.cj.jdbc.Driver

  sql:
    init:
      schema-locations: classpath:initsql/schema.sql
      data-locations: classpath:initsql/data.sql
      mode: always
      
  logging:
      level:
        org.springframework.web.client: DEBUG
        org.hibernate.SQL : DEBUG
        org.hibernate.type : trace

 

 

 

 

Main.class 

 

 

 

 

@EnableBatchProcessing을 반드시 추가해주셔야 합니다. 안그러면 아래와 같은 에러가 발생하여  Batch를 실행시킬수 없습니다. 

 

 

 

 

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 0 of constructor in com.example.springbatch.job.BatchJob1 required a bean of type 'org.springframework.batch.core.configuration.annotation.JobBuilderFactory' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.batch.core.configuration.annotation.JobBuilderFactory' in your configuration.


Process finished with exit code 1

 

 

 

 

MyJobTaskletOne

package com.example.springbatch.job;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@RequiredArgsConstructor
public class MyJobTaskletOne {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean(name =  "myJobTaskletOne_Job1")
    public Job myJobTaskletOne_Job1() {
        return this.jobBuilderFactory.get("myJobTaskletOne_Job1")
                .start(myJobTaskletOne_Job1_Step1())
                .next(myJobTaskletOne_Job1_Step2())
                .build();
    }

    @Bean
    public Step myJobTaskletOne_Job1_Step1() {
        return stepBuilderFactory.get("myJobTaskletOne_Job1_Step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("myJobTaskletOne_Job1_Step1");
//                        if(1==1) throw  new RuntimeException();
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }

    @Bean
    public Step myJobTaskletOne_Job1_Step2() {
        return stepBuilderFactory.get("myJobTaskletOne_Job1_Step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("myJobTaskletOne_Job1_Step2");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }

}

 

 

 

 

 

 

 

JobBuilderFactory를 통해 Job 생성하고 , StepBuilderFactory를 통해 Step을 생성합니다. Job은 하나이상의 Step으로 구성해야합니다.  각각의 job,step의 이름은 builderFactory를 통해 부여 할 수 있습니다. 

 

위의 예제는 Step은 Tasklet을 통해 구성한 형태입니다. Step을 구성하는 것은 크게 Tasklet[Reader, Processor,Writer]  2가지가 존재합니다.

 

 

 

 

 

Tasklet

 

사용자의 Custom 작업단위로써 Reader, Processor,Writer를 하나로 묶어서 처리하고자 사용합니다

 

 

 

Reader,Processor,Writer

 

역할을 나누어 처리가 가능 합니다.

 

 

  • Reader

 chunk 단위로 Reader가 db로부터 데이터를 읽습니다.

 

  • Processor

- Reader가 읽어 들인 데이터를 특정형태 데이터로 가공합니다.

 

  • Writer

- Processor가 가공한 데이터를 chunk단위로  DB 혹은 파일에 저장할 수 있으며,  chunk단위로 처리하니 DB의 경우 전체를 롤백하는 낭비를 줄일 수 있습니다. 

 

 

 

 

 

이렇게 만들어진 JOB을 실행 시키기 위해서는 몇가지 추가 작업을 해주셔야 합니다.  아래와 같이 Program arguments를 추가 해주셔야합니다

 

 

 

 

 

[requestDate]

 

Batch가 JOB INSTANCE를 생성하기 위한 JobParameter입니다.  이미 성공한 Job에 대해서는 동일한 JobParmeter로 실행 불가능 합니다. 

 

 

[--job.name] 

 

실행시킬 Job의 이름을 넣어주시면 됩니다. 이것은 application.yml에서 job.names에 값을 넣어주기 위함입니다. 보시면 None인 것을 보실 수 있는데 JobFactoryBuilder를 통해 등록한 job name과 argurments를 통해 넘긴 job name이 일치 하지않으면 Job을 동작시키지 않습니다. 

 

 

 

 

 

 

 

 

 

 

 

 

 

설정하시고 실행을 해보시면 아래와 같이 정상적으로 실행되면서 각 Step에서 값을 출력하는것을 볼 수 있습니다.

 

 

 

 

 

 

 

 

 

 

 

그러면 여기서 만약 Step1에서 문제가 발생하면 어떻게 될까요?

 

 

 

 

 

 

Flow

 

 

 

 

 

 

 

batch_job_instance.table

batch_job_excution_params.table

batch_job_excution.table

batch_step_exection.table

 

 

 

 

 

위의 그림처럼 실행 되었을때 생기는 데이터들입니다. 여기서  batch_step_exection.table을 보시면 step2는 생성되지 않았으며 Step1의 상태는 FAILED 이고 batch_job_excution.table에서도 상태는 FAILED인것을 확인 하 실 수 있습니다.

 

 

 

 

 

 

 

 

 

이번에는 Step을 Reader,Processor,Writer로 구현해보겠습니다. DB를 읽어서 사용할 것이니 필요한 상품 Entity를 생성하겠습니다.

 

 

Product.class

@Setter
@Getter
@Entity
@ToString
public class Product {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private int price;
}

 

 

 

이제는 Database에 data가 필요합니다.  떄문에 초기화 SQL을 작성하여 어플리케이션이 동작시 해당 초기화 SQL을 통해 데이터를 setting 하도록 하겠습니다. 

 

 

 

 

 

 

 

 

 

schema.sql

DROP TABLE IF EXISTS product;

CREATE TABLE product (
	id BIGINT(19) NOT NULL AUTO_INCREMENT,
	name VARCHAR(255) NULL DEFAULT NULL,
	price INT(10) NOT NULL,
	PRIMARY KEY (id) USING BTREE
)

 

 

 

data.sql

INSERT INTO product (name, price) values ("사과", 1000);
INSERT INTO product (name, price) values ("복숭아", 1500);
INSERT INTO product (name, price) values ("포도", 1200);
INSERT INTO product (name, price) values ("에어컨", 900);
INSERT INTO product (name, price) values ("티비", 500);
INSERT INTO product (name, price) values ("컴퓨터", 2000);
INSERT INTO product (name, price) values ("노트북", 3000);
INSERT INTO product (name, price) values ("라면", 4000);
INSERT INTO product (name, price) values ("콜라", 1000);
INSERT INTO product (name, price) values ("사이다", 1000);
INSERT INTO product (name, price) values ("선충기", 3000);
INSERT INTO product (name, price) values ("아이폰", 3000);
INSERT INTO product (name, price) values ("갤럭시", 3000);

 

 

application.yml

  sql:
    init:
      schema-locations: classpath:initsql/schema.sql
      data-locations: classpath:initsql/data.sql
      mode: always

 

 

 

 

어플리케이션을 실행해 보시면 초기화 SQL이 실행되어 데이터가 정상적으로 들어가 있는것을 확인 할 수 있습니다.

 

 

 

 

 

 

 

 

 

 

이젠 Reader,Processor,Writer구성한 Job을 만들 도록 하겠습니다. 

 

 

 

 

 

 

MyJobTwo.class 

package com.example.springbatch.job;

import com.example.springbatch.entity.Product;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Configuration // Spring batch의 모든 Job은 Configuration으로 등록해야한다.
public class MyJobTwo {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;



    @Bean(name = "myJobTwo_job1")
    Job myJobTwo_job1(){
        return jobBuilderFactory.get("myJobTwo_job1")
                .start(myJobTwo_job1_step1()).build();
    }

    @Bean
    @JobScope
    Step myJobTwo_job1_step1(){
        return stepBuilderFactory.get("myJobTwo_job1_step1")
                .<Product, Product>chunk(10)
                .reader(reader(null))
                .processor(processor(null))
                .writer(writer(null))
                .build();

    }


    @Bean
    @StepScope
    public JpaPagingItemReader<Product> reader(@Value("#{jobParameters[requestDate]}") String requestDate){
        log.info("[Product Reader start!] : {} ", requestDate);
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("price", 1000);
        return new JpaPagingItemReaderBuilder<Product>()
                .pageSize(10)
                .parameterValues(parameterValues)
                .queryString("SELECT m FROM Product m WHERE m.price >= : price")
                .entityManagerFactory(entityManagerFactory)
                .name("JpaPagingItemReader")
                .build();
    }

    @Bean
    @JobScope
    public ItemProcessor <Product, Product> processor(@Value("#{jobParameters[requestDate]}") String requestDate){
        return new ItemProcessor<Product, Product>() {
            @Override
            public Product process(Product item) throws Exception {
                log.info("[Product Process start!] : {} ", item);
                item.setName(String.format("%s_%s_%s", "pooney" ,requestDate, item.getName()));
                return item;
            }
        };
    }

    @Bean
    @JobScope
    public JpaItemWriter <Product> writer(@Value("#{jobParameters[requestDate]}") String requestDate){
        log.info("[Product Writer start!] : {} ", requestDate);
        return new JpaItemWriterBuilder<Product>().entityManagerFactory(entityManagerFactory).build();
    }
}

 

 

 

저는 Paging하여 데이터를 읽어오기 위하여 JpaPagingItemReader를 사용 하였습니다.

Jpa가 아니신분들어 Jdbc 등 지원하는것을 사용하시면 될 꺼 같습니다. 저는 Jpa를 사용하였기때문에 QueryString 부분을 보시면 일반적인 SQL QUERY가 아니라 JPQL로 작성한것을 볼 수 가 있습니다.  Flow는 아래와 같습니다. 

 

 

 

 

Job Flow

  1. 상품의 가격이 1000원 이상 10개 조회 
  2. 상품의 이름 변경 ( pooney_날짜_상품명) 
  3. DB에 UPDATE 진행 

 

 

 

@Value("#{jobParameters[requestDate]}") 

- ProgramArgurments를 통해 넘긴 jobParameters를 받도록 도와줍니다. 이것을 사용하기 위해서는 @StepScope를 붙여 주셔야합니다. 

 

 

 

 

한번 Job Name을 변경하고 Job을 실행 시키보겠습니다

 

 

 

 

 

 

 

 

 

 

 

조건에 해당하는 상품들의 이름이 바뀐것을 확인 할 수 있습니다. 그러면 이번에는 Batch Table을 확인 해 보겠습니다.

 

 

 

batch_job_instance.table

batch_job_excution_params.table

batch_job_excution.table

batch_step_exection.table

 

 

 

 

 

myJobTwo_job1이 Completed된것을 확인 할 수가 있습니다. 

 

 

 

 

 

 

그러면 스케줄러를 통해 동작시키는 방법은 어떻게 할까요? 스프링에서 제공해주는 @Scheduled를 통해 아주 아주 간단하게 만들겠습니다. 이방법은 보완해야 하는점이 많습니다. 이렇게 쓸수 있구나 하는 정도만 알아주시면 됩니다. 후에 Quartz, 젠킨스를 통한 스케줄러로 Job을 실행시키는 방법을 소개해 드리겠습니다. 

 

 

 

 

JobStarter.class

package com.example.springbatch.job;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;

import java.time.LocalDateTime;
import java.util.Date;

@Configuration
@RequiredArgsConstructor
public class JobStarter {

    private final JobLauncher jobLauncher;
    private final Job myJobTaskletOne_Job1;
    private final Job myJobTwo_job1;


    @Scheduled(fixedDelay=5000)
    public void TaskletStart() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters parameters = new JobParametersBuilder().addString("requestDate", LocalDateTime.now().toString()).toJobParameters();
        JobExecution execution = jobLauncher.run(myJobTaskletOne_Job1, parameters);
    }
    @Scheduled(fixedDelay=3000)
    public void JobStart() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters parameters = new JobParametersBuilder().addString("requestDate", LocalDateTime.now().toString()).toJobParameters();
        JobExecution execution = jobLauncher.run(myJobTwo_job1, parameters);
    }
}

 

2022-06-22 17:31:32.633  INFO 15564 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [myJobTwo_job1_step1] executed in 52ms
2022-06-22 17:31:32.656  INFO 15564 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJobTwo_job1]] completed with the following parameters: [{requestDate=2022-06-22T17:31:32.519850600}] and the following status: [COMPLETED] in 107ms
2022-06-22 17:31:34.680  INFO 15564 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJobTaskletOne_Job1]] launched with the following parameters: [{requestDate=2022-06-22T17:31:34.660960}]
2022-06-22 17:31:34.699  INFO 15564 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [myJobTaskletOne_Job1_Step1]
myJobTaskletOne_Job1_Step1
2022-06-22 17:31:34.717  INFO 15564 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [myJobTaskletOne_Job1_Step1] executed in 18ms
2022-06-22 17:31:34.736  INFO 15564 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [myJobTaskletOne_Job1_Step2]
myJobTaskletOne_Job1_Step2
2022-06-22 17:31:34.752  INFO 15564 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [myJobTaskletOne_Job1_Step2] executed in 16ms
2022-06-22 17:31:34.766  INFO 15564 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJobTaskletOne_Job1]] completed with the following parameters: [{requestDate=2022-06-22T17:31:34.660960}] and the following status: [COMPLETED] in 80ms

 

 

 

 

스케줄링을 통해 3초 , 5초 마다 해당 Job이 실행되는것을 확인 할 수 있습니다.  아주 미흡하게 설명했는데 도움이 되셨으면 합니다. 감사합니다 .