pooney
article thumbnail

 

안녕하세요. 오늘은 대용량의 데이터를 빠르게 처리 하기 위해 배치를 구성해야 하는 경우가 많은데요. 이러한 병렬처리가 를 하기 위한 Multi-Threaded Step 방식에 대해서 알아 보려고합니다.  

 

 

 

 

 

Multi-Threaded Step 

하나의 Step에서 멀티스레드를 사용해 ItemReader, ItemProcessor, ItemWriter를 병렬로 실행해 동일한 데이터 소스를 여러 스레드가 동시에 일고 처리 하는 기능을 제공합니다. 

 

 

즉 Multi-threaded Step은 단일 Step에서 여러 쓰레드를 병렬로 실행하여 작업을 처리하는 방식입니다. 자세한 설명은 아래의 Spring docs에 자세히 설명 되어 있으니 참고해 보는 것도 좋을 것 같습니다. 

 

 

 

https://docs.spring.io/spring-batch/reference/scalability.html

 

Scaling and Parallel Processing :: Spring Batch

Many batch processing problems can be solved with single-threaded, single-process jobs, so it is always a good idea to properly check if that meets your needs before thinking about more complex implementations. Measure the performance of a realistic job an

docs.spring.io

 

 

 

 

 

 

 

 

제가 생각하는 동작하는 과정은 "TaskExecutor를 사용하여 하나의 Step을 여러 쓰레드가 데이터를 처리한다" 인 것 같습니다.

 

글로 설명하는 것보다 코드로 보는게 더 이해가 쉽기 때문에 코드로 한번 보겠습니다.  해당 Job은 회원목록을 조회하고 다른 테이블에 Insert하는 Migration작업을 하는 배치입니다. 

 

 

 

@RequiredArgsConstructor
@Configuration
public class MultiThreadJob {
    private final String JOB_NAME = "MULTI_JOB";
    private final String STEP_NAME = JOB_NAME + "_STEP";
    private final String POOL_NAME = JOB_NAME + "_POOL";
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Value("${chunkSize:1000}")
    private int chunkSize;

    @Value("${poolSize:10}")
    private int poolSize;


    @Bean(name = JOB_NAME)
    public Job job(@Qualifier(STEP_NAME) Step step) throws Exception {
        return jobBuilderFactory.get(JOB_NAME)
                .start(step)
                .incrementer(new RunIdIncrementer())
                .build();
    }


    @Bean(name = STEP_NAME)
    @JobScope
    public Step step() throws Exception {
        return stepBuilderFactory.get(STEP_NAME)
                .<MemberDto, MigrationMemberDto>chunk(chunkSize)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .taskExecutor(executor()) 
                .throttleLimit(poolSize) // (1)
                .build();
    }




    @StepScope
    @Bean(name = JOB_NAME + "_reader")
    public JdbcPagingItemReader<MemberDto> reader() throws Exception {
        return new JdbcPagingItemReaderBuilder<MemberDto>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(MemberDto.class))
                .queryProvider(createQueryProvider())
                .saveState(false) // (2)
                .name("jdbcPagingItemReader")
                .build();

    }


    @Bean(name = JOB_NAME +"_processor")
    @StepScope
    public ItemProcessor<MemberDto, MigrationMemberDto> processor() {
        return memberdto ->  new MigrationMemberDto(memberdto.getMemberIdx(), "KEY_" + memberdto.getMemberId(), "VALUE_" + memberdto.getMemberId());
    }

    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("member_idx, member_id");
        queryProvider.setWhereClause("member_idx < 10000");
        queryProvider.setFromClause("from member");
        queryProvider.setSortKeys(Collections.singletonMap("member_idx", Order.ASCENDING));

        return queryProvider.getObject();
    }

    
    @Bean(name = POOL_NAME)
    public TaskExecutor executor() { 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //(3)
        executor.setCorePoolSize(poolSize);
        executor.setMaxPoolSize(poolSize);
        executor.setThreadNamePrefix("multi-thread-");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }




    @StepScope
    @Bean(name = JOB_NAME + "_WRITER")
    public JdbcBatchItemWriter<MigrationMemberDto> writer() {
        JdbcBatchItemWriter<MigrationMemberDto> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource); // 데이터소스 설정
        writer.setSql("INSERT INTO migration_member (member_idx, mg_key,mg_value) VALUES (:memberIdx, :key, :value)"); // SQL 설정
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        writer.afterPropertiesSet(); // 필수 설정 완료
        return writer;
    }
}

 

 

(1) throttleLimit

  • 쓰레드풀의 쓰레드중 몇개를 사용할지를 결정합니다.
  • 설정을 5로 하고 10개의 쓰레가 있다면 5개만 사용한다는 것을 의미합니다. 

(2) TashPoolTaskExcutor

  • 쓰레드 풀을 쉽고 간단하게 설정 할 수 있도록 도와주는 클래스입니다. 

 

 

위에 Spring docs의 테스트 코드에서는 SimpleAsyncTashExcutor 를 사용했는데  아래와 같이 SimpleAsyncTashExcutor 는 ThreadPool을 이용하여 Thread를 재사용 하는 방식이 아닌 새로 생성해서 사용하는 방식임으로 TaskExcutor를 구현해서 사용하라고 되어 있습니다. 



 

(3) saveState

  • 실패한 지점을 기록하는 기능입니다.
  • 멀티쓰레드 방식에서는 이기능은 false로 두는것이 좋습니다.  멀티쓰레드 방식에서 순서를 보장 하지 않기 때문에 잘 못 동작 했을 경우 오동작 할 가능성이 큽니다. 때문에 처음부터 다시 시작하도록 하는 것이 좋습니다. 

 

 

 

 

 

 

우선 비교를 하기 위해서 한번 단일 쓰레드 방식으로 진행을 해보겠습니다.

 

 

 

 

 

 

 

단일 쓰레드이기 때문에 Main 쓰레드가 동작하는 것을 확인 해 볼 수가 있습니다. 또한 chunk를 1000으로 설정 했기 때문에 그에 맞게 조회를 진행 하는 것을 볼 수 가 있습니다.  이번에는 단일 쓰레드가 1만건을 처리하는 걸리는 시간을 한 번  확인 해 보겠습니다. 

 

 

 

 

 

 

 

[
  {
    "START_TIME": "2024-12-21 22:14:40.863000",
    "END_TIME": "2024-12-21 22:14:43.163000"
  }
]



 

 

 

 

약 3초 정도 걸리는 것을 확인 해 볼 수 있습니다. 그러면 이제는 Multi-threaded Step 방식을 사용했을을때는 어떻게 달라지는 확인 해 보 겠습니다. 

 

 

 

 

 

 

 

 

 

달라진 모습이 보이시나요? thread1~10 까지 무작위로 실행 되는 것을 확인 해 볼 수가 있습니다.  이번에는 Multi-threaded Step 에서는 얼마나 시간이 걸리는 지 확인 해 보겠습니다. 

 

 

 

 

[
  {
    "START_TIME": "2024-12-21 22:22:29.989000",
    "END_TIME": "2024-12-21 22:22:31.345000"
  }
]

 

 

약 2초 걸리는 것을 확인 할 수가 있습니다. 데이터가 1만건 정도 이기 때문에 thread가 많아도 처리할 대상이 적으면 유의미한 효과는 보기 힘들 수 있지만 저희는 멀티 쓰레드로 열심히 동작 하고 빠르게 처리가 가능하겠구나 정도로만 알 수 있으면 될 것 같습니다. 

 

 

 

 

 

 

 

Cursor로 동작 시킬때는 주의 사항이 있습니다. 

 

 

 

 

흔히 조건에 상태값등이 들어간 경우 Paging을 사용할때 Cursor를 사용하는 경우가 많습니다. 만약 아래와 같이 변경 하면 어떻게 되는지 확인 해보겠습니다. 

 

 

@StepScope
@Bean(name = JOB_NAME + "_reader")
public JdbcCursorItemReader<MemberDto> reader() throws Exception {
    String sql = "select member_idx, member_id from member where member_idx < 10000 ORDER BY member_idx asc";
    return new JdbcCursorItemReaderBuilder<MemberDto>()
            .fetchSize(chunkSize)
            .dataSource(dataSource)
            .rowMapper(new BeanPropertyRowMapper<>(MemberDto.class))
            .saveState(false) 
            .sql(sql)
            .name("jdbcPagingItemReader")
            .build();

}

 

 

 

 

 

 

그러면 아래와 같은 에러를 만나게 될 수 있는데요.

 

 

 

 

org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.
	at org.springframework.batch.item.database.AbstractCursorItemReader.verifyCursorPosition(AbstractCursorItemReader.java:395) ~[spring-batch-infrastructure-4.3.6.jar:4.3.6]
	at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:506) ~[spring-batch-infrastructure-4.3.6.jar:4.3.6]

 

 

 

 

 

이유는 간단합니다. JdbcCursorItemReader는 Thread-safe 하지 않다는 것입니다. 그렇기 때문에 멀티쓰레드 방식에서는  위와 같은 에러가 발생 할 수가 있습니다. 아래는 JdbcPagingItemReader 와 JdbcCursorItemReader 인데요  빨삭색 박스를 보시면 JdbcPagingItemReader는 Thread-safe 하다고 명시를 해놓은 반면에 JdbcCursorItemReader는 관련 내용이 없습니다. 

 

 

 

 

 

 

JdbcPagingItemReader.class 

 

 

 

JdbcCursorItemReader.class

 

 

 

 

 

그러면 Cursor를 사용 할 수 없는 걸까요? 다행히도 아래와 같이 Spring docs 에서 Reader가 Thread-safe 하지 않다면 SynchronizedItemStreamReader을 사용 하면 된다고 설명해주었습니다. 

 

 

 

 

 

 

 

아래와 같이 SynchronizedItemStreamReader에 위임을 하는 코드로 변경 해서 동작 시키면 정상 동작을 확인 해 볼 수 있습니다. 

 

 

@StepScope
@Bean(name = JOB_NAME + "_reader")
public SynchronizedItemStreamReader<MemberDto> reader() throws Exception {
    String sql = "select member_idx, member_id from member where member_idx < 10000 ORDER BY member_idx asc";
    JdbcCursorItemReader<MemberDto> cursorItemReader = new JdbcCursorItemReaderBuilder<MemberDto>()
            .fetchSize(chunkSize)
            .dataSource(dataSource)
            .rowMapper(new BeanPropertyRowMapper<>(MemberDto.class))
            .saveState(false) 
            .sql(sql)
            .name("jdbcPagingItemReader")
            .build();

    return new SynchronizedItemStreamReaderBuilder<MemberDto>()
            .delegate(cursorItemReader)
            .build();
}

 

 

 

 

 

 

가능한 이유는 synchronized기능을 사용 하여 read() 를 thread-safe하게 만들어 주어 멀티쓰레드 환경에서 정상 동작 하도록 만들었기 때문입니다. 

 

 

 

 

 

 

 

 

 


Thread-safe하다는 것을 보고 그러면 속도가 떨어지는거 아닌가?



라는 생각을 가질 수가 있는데요. 대부분의 시간이 걸리는 요소는 Processor와 Writer에서 많은 비용과 시간이 들게 되는데  Processor와 Writer에서 멀티쓰레드로 동작하니 단일 쓰레드 방식 보다 멀티 스레드로 구성시 훨씬 더  빨리 완료가 될 수 있습니다. 

 

 

 

 

제가 해당 내용을 작성하게 된 이유는 대용량의 데이터를 빠르게 어떻게 처리 할 수 있을까? 에서 시작하여 공부하게 되었습니다. 단순히 멀티쓰레드로 사용하는게 아니라 어떻게 동작하는지 궁금했었는데 여러 자료를 참고 하면서 많은 걸 알 수 있었고 많은 분들에게 도움이 되었으면 좋겠습니다. 

 

 

 

 

 

 

 

 

https://docs.spring.io/spring-batch/reference/scalability.html

https://jojoldu.tistory.com/493

profile

pooney

@pooney

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!