Spring boot/spring-batch

[Spring-Batch] Partitioning을 이용한 대량의 데이터 처리

pooney 2024. 12. 16. 21:27

 

 

안녕하세요. 오늘은 Spring Batch를 이용한 Partitioning에 대해서 작성해 보려고 합니다. 프로젝트를 진행하면서 

대용량의 데이터를 처리해야 할 일이 발생을 했는데요.  대용량을 빠르게 처리 하기 위해서 흔히 병렬적으로 처리하는 방법을 사용하는 경우가 많습니다. 병렬적으로 처리 하기 위해 대표적인 Scalling 기능인  Multi-Threaded Step 방식도 많이 생각 하실 겁니다. 하지만 이번에는 Partitioning 기능을 이용한 방법에 대해서 알아 보려고  합니다. 

 

 

 

 

 

 

 

Partitioning

데이터를 물리적으로 분할하여 각 파티션을 별개의 Step으로 처리하는 방식입니다. 각 파티션은 독립적으로 실행되며, 멀티스레드 환경 또는 클러스터링 환경에서 실행될 수 있도록 제공하는 기능입니다. 

 

 

 

 

파티셔닝은 아래 그림과 같이 Manager Step은 통해 대용량 데이터를 위해서 정해 놓은 수의 Worker에게 요청 해서 분산 처리를 하는 것이라고 할 수 있습니다. 

 

 

 

 

 

 

 

 

즉 우리가 흔히 평상시에 작성하는 Step이 Worker가 되고 이것을 알맞게 분산 처리 하도록 도와주는 Step이 Manger라고 생각 하시면 쉽습니다.  그렇기 때문에 Manager를 잘 구성하면 나머지는 크게 다르지 않습니다. 저희가 주의깊게 볼 것은 기본적으로 PartitionStep(Manager)과 PartitionHandler, Partitioner인데 이것에 대해 알아 보겠습니다.

 

 

 

 

PartitionStep(Manager)

 

PartitionStep은 여러 Worker들을 관리하게 되는 Step으로 아래와 같은 형태로 구성을 하게 됩니다. 

 

@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

 

 

 

 

 

 

 

PartitionHandler

 

PartitionHandler는 어떤 Worker Step을 사용하고 다룰 것인지, 어떤 Thread Pool을 사용할지, gridSize를해 생성할 별도의 Step 실행 횟수를 결정하게 됩니다. 

 

 

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

 

 

 

 

 

Partitioner

 

Partitioner는 새로운 단계 실행을 위한 입력 매개변수로 실행 컨텍스트를 생성합니다. 즉 몇개의 Partition으로 나누어서 진행 할지 gridSize를 통해서 설정 할 수 있습니다.  Partitioner 인터페이스는 단 하나의 메소드를 가지고 있는데 Key 값은 각 Step Excution의 유니크 한 이름을 결정하게 됩니다. 

 

 

 

 

 

 

 

 

 

 

아래는 Partition의 시퀀스 다이어그램으로 역할과 흐름을 보여주고 있습니다. 

 

 

 

 

 

 

 

 

 

 

 

아무래도 글로보는 것보다 코드로 보는것이 더 이해하기 쉬우니 이제부터는 코드를 통해서 알아 보도록 하겠습니다. 

 

 

문제 : 회원 1만이 존재하고 각 회원의 설정값을 1:N(10) 형태로 새로운 Table에 마이그레이션을 해야 한다고 하겠습니다. 

 

 

 

 

 

.

우선 회원별 idx를 기준으로 각 Step이 동작 할 수 있도록 range는 나눌건데 이러한 작업을 도와줄 Patitioner를 아래와 같은 코드로 작성하겠습니다. 

 

 

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

import java.util.HashMap;
import java.util.Map;

public class MigrationPartitioner implements Partitioner {
    private final int totalMembers;

    public MigrationPartitioner(int totalMembers) {
        this.totalMembers = totalMembers;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> partitions = new HashMap<>();
        
        int membersPerPartition = totalMembers / gridSize;
        int remainingMembers = totalMembers % gridSize;

        int start = 1; // 회원 ID 또는 인덱스 시작
        for (int i = 0; i < gridSize; i++) {
            int end = start + membersPerPartition - 1;
            if (i < remainingMembers) {
                end++; // 남은 회원 수를 균등하게 분배
            }

            ExecutionContext context = new ExecutionContext();
            context.putInt("start", start);
            context.putInt("end", end);
            context.putString("partitionName", "Partition" + i);

            partitions.put("partition" + i, context);

            start = end + 1;
        }

        return partitions;
    }
}

 

 

 

 

 

 

어떤 Worker Step을 사용하고 파티셔닝을 할 것인지 설정하는 migrationManager(PartitionStep)을 통해서 설정하게 되고 어떤 쓰레드풀과 몇개의 파티션으로 나눌지를 설정하는 PartitionHandler를 아래와 같은 형태로 작성 하겠습니다. 

 

 

 

@JobScope
@Bean(name = JOB_NAME + "_Manager")
public Step migrationManager() throws Exception {
    return stepBuilderFactory.get("migration.manager") 
            .partitioner("migration", partitioner()) // 1 
            .step(step()) // 2
            .partitionHandler(partitionHandler()) // 3
            .build();
}

@StepScope
@Bean(name = JOB_NAME + "_partitionHandler")
public TaskExecutorPartitionHandler partitionHandler() throws Exception {
    TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); //4 
    partitionHandler.setStep(step());
    partitionHandler.setTaskExecutor(executor());
    partitionHandler.setGridSize(poolSize);
    return partitionHandler;
}

 

 

 

(1) partioner()

  •  해당 기능을 통해 어떻게 파티셔닝할지를 다루는 Partitioner를 설정하게 됩니다. 
  •  예제에서는 회원 1만을 기준으로 Range를 설정하는 Partitioner를 작성했습니다.

 

 

(2) step()

  •  어떠한 step을 병렬적으로 동작 시킬 것인지를 설정하게 됩니다. 즉 worker step을 의미하게 됩니다
  • 파티셔닝된 Step은 서로 다른 StepExctution을 가집니다.

 

(2) partitionHandler ()

  • 멀티쓰레드로 수행 할 수 있도록 구현체를 설정합니다. 
  • 멀티쓰레드로 동작하도록  TaskExecutorPartitionHandler를 사용합니다. 
  • gridSize를 통해 partition을 나누게 됩니다. 일반적으로 poolSize랑 동일하게 작성을 합니다.
    (예제에서는 3으로 설정하여 진행했습니다)

 

 

 

 

 

 

 

 

 

이번에는  Processor를  살펴 볼건데요 회원을 조회 한 후에 각 회원당 10개의 row로 나누기 위한 코드를 아래와 같은 형태로 구성했습니다. 

 

 

    @Bean(name = JOB_NAME + "_processor")
    @StepScope
    public ItemProcessor<MemberDto, List<MigrationMemberDto>> processor() {
        return memberdto -> IntStream.rangeClosed(1, 10)
                .mapToObj(i -> new MigrationMemberDto(memberdto.getMemberIdx(), "KEY_" + i, "VALUE_" + i))
                .collect(Collectors.toList());
    }

 

 

 

 

 

 주의 사항으로는 Worker Step에는 @JobScope는 사용하면 안됩니다. 

 

 

 

    // @JobScope  해당 어노테이션은 Worker step에서는 제외 해야합니다. 
    @Bean(name = STEP_NAME)
    public Step step() throws Exception {
        return stepBuilderFactory.get(STEP_NAME)
                .<MemberDto, List<MigrationMemberDto>>chunk(chunkSize)
                .reader(reader(null,null))
                .processor(processor())
                .writer(writer())
                .build();
    }

 

 

 

 

 

만약 @JobScope를 사용하면  아래와 같은 에러를 만날 수가 있는데요. 때문에 해당 어노테이션은 제외하시길 바랍니다. 

 

 

 

 

 

java.util.concurrent.ExecutionException: org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name 'scopedTarget.PARTITION_JOB_STEP': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[na:na]
	at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.doHandle(TaskExecutorPartitionHandler.java:120) ~[spring-batch-core-4.3.6.jar:4.3.6]
	at org.springframework.batch.core.partition.support.AbstractPartitionHandler.handle(AbstractPartitionHandler.java:61) ~[spring-batch-core-4.3.6.jar:4.3.6]

 

 

 

 

 

 

전체적인 코드의 형태는 아래와 같습니다. 

 

 

 

 

@RequiredArgsConstructor
@Configuration
public class PartitionJob {

    private final String JOB_NAME = "PARTITION_JOB";
    private final String STEP_NAME = JOB_NAME + "_STEP";
    private final String POOL_NAME = JOB_NAME + "_POOL";
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Value("${chunkSize:1000}")
    private int chunkSize;

    @Value("${poolSize:3}")
    private int poolSize;


    @Bean(name = JOB_NAME)
    public Job job() throws Exception {
        return jobBuilderFactory.get(JOB_NAME)
                .start(migrationManager())
                .incrementer(new RunIdIncrementer())
                .build();
    }


    @JobScope
    @Bean(name = JOB_NAME + "_Manager")
    public Step migrationManager() throws Exception {
        return stepBuilderFactory.get("migration.manager")
                .partitioner("migration", partitioner()) //
                .step(step()) // 2
                .partitionHandler(partitionHandler()) // 3
                .build();
    }



    // @JobScope  해당 어노테이션은 Worker step에서는 제외 해야합니다.
    @Bean(name = STEP_NAME)
    public Step step() throws Exception {
        return stepBuilderFactory.get(STEP_NAME)
                .<MemberDto, List<MigrationMemberDto>>chunk(chunkSize)
                .reader(reader(null,null))
                .processor(processor())
                .writer(writer())
                .build();
    }





    @StepScope
    @Bean(name = JOB_NAME + "_reader")
    public JdbcPagingItemReader<MemberDto> reader(
            @Value("#{stepExecutionContext[start]}") Long start,
            @Value("#{stepExecutionContext[end]}") Long end

    ) throws Exception {
        Map<String, Object> params = new HashMap<>();
        params.put("start", start);
        params.put("end", end);

        return new JdbcPagingItemReaderBuilder<MemberDto>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(MemberDto.class))
                .queryProvider(createQueryProvider())
                .parameterValues(params)
                .saveState(false) //
                .name("jdbcPagingItemReader")
                .build();

    }


    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("member_idx, member_id");
        queryProvider.setWhereClause("member_idx between :start and :end");
        queryProvider.setFromClause("from member");
        queryProvider.setSortKeys(Collections.singletonMap("member_idx", Order.ASCENDING));

        return queryProvider.getObject();
    }


    @Bean(name = POOL_NAME)
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(poolSize);
        executor.setMaxPoolSize(poolSize);
        executor.setThreadNamePrefix("partition-");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }


    @StepScope
    @Bean(name = JOB_NAME + "_WRITER")
    public ListWriter<MigrationMemberDto> writer() {
        JdbcBatchItemWriter<MigrationMemberDto> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource); // 데이터소스 설정
        writer.setSql("INSERT INTO migration_member (member_idx, mg_key,mg_value) VALUES (:memberIdx, :key, :value)"); // SQL 설정
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        writer.afterPropertiesSet(); // 필수 설정 완료
        return new ListWriter<>(writer);
    }


    @StepScope
    @Bean(name = JOB_NAME + "_partitionHandler")
    public TaskExecutorPartitionHandler partitionHandler() throws Exception {
        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); //4
        partitionHandler.setStep(step());
        partitionHandler.setTaskExecutor(executor());
        partitionHandler.setGridSize(poolSize);
        return partitionHandler;
    }


    @Bean(name = JOB_NAME + "_partitioner")
    @StepScope
    public MigrationPartitioner partitioner() {
        return new MigrationPartitioner(10000);
    }




    @Bean(name = JOB_NAME + "_processor")
    @StepScope
    public ItemProcessor<MemberDto, List<MigrationMemberDto>> processor() {
        return memberdto -> IntStream.rangeClosed(1, 10)
                .mapToObj(i -> new MigrationMemberDto(memberdto.getMemberIdx(), "KEY_" + i, "VALUE_" + i))
                .collect(Collectors.toList());
    }

}

 

 

 

 

그러면 이제는 실행하고 로그를 확인 해보면 PoolSize=3으로 설정 했기 때문에 Partition1~3번 까지 병렬적으로 실행 되는 것을 확인 해 볼 수 있고 각 Partition은 자기의 Range에 맞게 Chunk단위로 조회를 진행하는 것을 로그를 통해 알 수 있습니다. 

 

 

 

 

 

파티션명 Range
Partition-1 1~ 3334
Partition-2 3335 ~ 6667
Partition-3 6668~10000

 

 

 

이번에는 StepExcution을 확인해 보면 PartionStep이 먼저 실행 된 후에 각 PARTITION_JOB_STEP:partion{num} 이 실행 된게 보이시나요? 여기서 각 {num}은 위에 작성한 MigrationPartitioner에서 부여한 번호 입니다.

 

 

 

 

 

 

 

{num}값이 위의 쓰레드 번호랑 일치 하지 않는 이유는 Partition을 나눌때 아래와 같이 0번 부터 시작했기 때문입니다. 결과적으로 3개의 Partition이 정상적으로 동작을 하고 있고 각 Step이 자기만에 고유한 StepExcution을 가지고 Range에 맞게 동작하고 있다는 것을 확인 할 수 있습니다.  

 

 

 

 

 

 

 

 

 

그러면 Multi-threaded Step과 Partitioning의 근본적인 차이는 무엇일까요?

 

 

 

 

Multi-threaded Step은 하나의 Step을 여러 쓰레드가 병렬로 처리하고 Partitioning은 파티션으로 하나의 Step을 별도의 여러 Step으로 나누고 각 쓰레드가 나누어진 쓰레드(Worker) 를 진행한다. 

 

 

 

 

 

 

Multi-threaded Step

  • 하나의 Step을 여러 쓰레드가 병렬로 데이터 처리
  • 단순히 JVM 내에서 쓰레드를 사용해 병렬화하는 방식으로, 설정이 간단하고 소규모 데이터에 적합.
  • TaskExecutor만 추가하여 비교적 간단하게 사용 가능 
  • 데이터 크기가 비교적 작을때 사용

 

 

 

Partitioning  Step

  • 하나의 Step을 별도의 여러 Step으로 나누고 각 쓰레드가 나누어진 쓰레드(Worker) 를 진행한다. 
  • 데이터를 물리적으로 나누고, 독립적인 Step으로 실행하는 방식으로, 대규모 데이터 처리 및 클러스터 환경에 적합.
  • Partitioner 작성 및 데이터 분할 로직 필요하기 때문에 구현이 복잡하다.
  • 대규모 데이터 처리에 적합

 

 

 

 

 

 

 

 

저는 회원 약 400만명의 데이터를 마이그레이션하는 배치를 담당을 하게 되었을때 빠른 시간안에 어떻게 해결 할 수 있을까? 를 고민 하던 중에 멀티쓰레드로 동작하는 배치가 필요했고 다양한 기법들이 존재 했는데 그중 저는 해당 작업을 회원수를 기준으로 잘라서 작업을 진행 하기로 했기 때문에 Partitioning을 이용하여 처리를 진행 했습니다. 그러면서 알게 되었던 것들에 대해서 이렇게 작성을 했는데 많은 분들에게 도움이 되었으면 합니다. 

 

 

 

 

 

 

 

https://docs.spring.io/spring-batch/reference/scalability.html

https://jojoldu.tistory.com/550