Spring boot/spring-batch

[Spring-Batch] 대용량 랭킹 배치 설계

pooney 2025. 12. 6. 20:07

 

 

안녕하세요  오늘은 대용량 랭킹 배치에 대해서 알아보려고 합니다.  해당 랭킹 배치는 진행하던 프로젝트에서 경험 한 내용으로 제가 생각하던 것을 공유 하고 싶어 진행하게 되었습니다. 

 

 

제가 진행하던 랭킹은 나와 내친구들을 기준으로 하는 개인화 랭킹이였습니다. 해당 기능은 아래와 같은 조건이 있습니다. 

 

  • 회원 : 약 390만 
  • 친구 관계 : 회원마다 친구 수가 제각각 (특정 회원의 경우 친구가 3만명)
  • 친구 테이블 수 : 본인 * 친구들 = 약 1400만(?)
  • 본인 + 친구들 집하에 대해 랭킹을 계산해야함 
  • 랭킹 결과를 테이블에 적재한다. 

 

해당 조건을 기준으로 "대용량 친구 랭킹 배치" 를 어떻게 설계하면 안전하고 성능 이슈 없이 돌아가는지 정리해보겠습니다. 

 

스키마 설계 

 

-- 회원
create table member
(
    member_idx int unsigned auto_increment
        primary key,
    name       varchar(50)                        not null,
    created_at datetime default CURRENT_TIMESTAMP not null
);

-- 친구
create table friend
(
    idx        int unsigned auto_increment
        primary key,
    member_idx int unsigned                       not null,
    friend_idx int unsigned                       not null,
    created_at datetime default CURRENT_TIMESTAMP not null,
    constraint uk_friend
        unique (member_idx, friend_idx)
);

create index idx_friend_friend
    on friend (friend_idx);

create index idx_friend_member
    on friend (member_idx);


-- 친구 랭킹

create table friend_rank
(
    idx        int unsigned auto_increment
        primary key,
    member_idx int unsigned                       not null,
    friend_idx int unsigned                       not null,
    score      int                                not null,
    ranking    int                                not null,
    updated_at datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP,
    constraint friend_rank_pk
        unique (member_idx, friend_idx)
);

create index idx_rank_member
    on friend_rank (member_idx);

create index idx_rank_member_ranking
    on friend_rank (member_idx, ranking);


-- 스코어 


create table score
(
    idx        int unsigned auto_increment
        primary key,
    member_idx bigint unsigned                    not null,
    score      int      default 0                 not null,
    updated_at datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP
);

create index idx_score_member
    on score (member_idx);

 

 

 

[조건]

 

비슷한 환경을 구성하기 위해 다음을 전제로 진행했습니다.

  • friend 테이블에는 친구 관계만 존재한다
  • 본인은 friend 테이블에는 존재 하지 않는다.

 

그래서 랭킹을 계산할떄는 : 

  • 나 + 내친구들 집합에 대해 랭킹을 계산해야 함으로 friend + member(자기자신)을 합쳐서 처리한다.

 

 

"friend" 테이블에 자기 자신 row를 넣어주면 설계가 훨씬 깔끔하지만 운영 중인 스키마를 기준으로 설계를 진행했습니다.

 

 

 

 

여기서 "본인을 포함한 랭킹" 을 어떻게 만들까?

friend 테이블에는 본인이 없기 때문에, 각 회원의 개인화 랭킹은 아래와 같이 구성하려고 합니다

 

 

 

 

[의도]

  • friend 테이블에는 친구만 있으므로, 별도의 select로 본인을 합쳐야한다.
  • UNION ALL을 사용해서 "친구 스코어 + 본인스코어"를 하나의 스트림으로 만든다
  • member_idx와 범위 조건 between에 인덱스가 있으면 범위 스캔으로 처리된다.

 

 

 

 

병렬을 위한 Partitioner 사용 이유

 

크게 병렬로 돌릴 수 있는 방법은 Multi-threaded Step 과 Partitioner가 존재합니다. 그중 partioner을 사용한 이유는 정리하면 이렇습니다. 

  • 회원 390만명을 단일 Step, 단일 thread로 처리하면 시간이 너무 오래 걸린다.
  • member_idx는 PK 이므로, 범위를 잘 쪼개기 쉽다.
  • 따라서 member_idx 범위를 파티션으로 나누고, 각 파티션을 병렬로 처리하면 전체 소요시간을 줄일 수 있다.

 

@Component
@RequiredArgsConstructor
public class FriendRankingPartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> executionContextMap = new HashMap<>();
        String sql = """
                    select MIN(member_idx) as minMemberIndex, MAX(member_idx) as maxMemberIndex from friend
                """;
        FriendMinMaxDto friendMinMaxDto = jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(FriendMinMaxDto.class))
                .stream()
                .findFirst()
                .orElse(null);

        if (Objects.isNull(friendMinMaxDto) || Objects.isNull(friendMinMaxDto.minMemberIndex) || Objects.isNull(friendMinMaxDto.maxMemberIndex)) {
            return executionContextMap;
        }

        // 나누려는 개수
        long size = (friendMinMaxDto.maxMemberIndex - friendMinMaxDto.minMemberIndex) / gridSize + 1;
        // 시작 index
        long start = friendMinMaxDto.getMinMemberIndex();
        // 종료 index
        long end = start + size - 1;


        for (int i = 0; i < gridSize; i++) {
        	// 불필요한 파티션 생성을 막기 위함
            if (start > friendMinMaxDto.maxMemberIndex) break;
            // 마지막의 경우 마지막 index로 end를 설정하기 위함
            if (end > friendMinMaxDto.maxMemberIndex) {
                end = friendMinMaxDto.maxMemberIndex;
            }
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.putLong("startIndex", start);
            executionContext.putLong("endIndex", end);

            executionContextMap.put("partition" + i, executionContext);
            start = end + 1;
            end = start + size - 1;
        }
        return executionContextMap;
    }


    @Setter
    @Getter
    public static class FriendMinMaxDto {
        private Integer minMemberIndex;
        private Integer maxMemberIndex;
    }
}

 

 

이렇게 하면 390만(?)명을 N개의 파티션으로 나누어서, 각 step은 390만/N + 1로 처리를 진행하게 됩니다.

 

 

 

 

Reader로 Cursor를 사용한 이유

크게 보면 reader는 paging 과 cursor가 존재합니다. 그중 저는 cursor를 선택했는데 이유를 정리하면 이렇습니다.

  • 랭킹 계산은 member_idx + score DESC 기준으로 정렬된 데이터를 스트림으로 처리하는 게 가장 깔끔하다.
  • Cursor 기반으로 한 번 정렬된 결과를 받아서, Processor에서 dense rank를 계산하기 쉽다.
  • Paging(OFFSET/LIMIT)으로 랭킹을 나누면 페이징 키 / 중복 처리 / 정렬 안정성 이슈를 신경 써야 한다
  •  

물론 단점도 존재합니다 단점은 이렇습니다 

  • 각 Step이 커넥션을 Step 전체 동안 점유하기 때문에
  • Partition 수 > 커넥션 풀 크기이면 바로 connection timeout이 난다.

 

때문에 저는 정렬된 스트림을 한번에 처리하고 싶었기 때문에 cursor를 선택하였습니다.  물론 이경우 신경 써야 하는 부분은 커넥션 풀과 parition 수를 잘 맞춰서 진행해야 합니다.  해당 풀에 대한 내용은 아래에서 정리 하기로 하고 우선 reader 부분을 보겠습니다. 

 

 

 

@Configuration
public class FriendRankingReaderConfig {
    private final static String sql = """
            
                        (
                            SELECT
                                f.member_idx AS memberIdx,
                                f.friend_idx AS friendIdx,
                                COALESCE(s.score, 0) AS score
                            FROM friend f
                            LEFT JOIN score s
                                ON s.member_idx = f.friend_idx
                            WHERE f.member_idx BETWEEN ? AND ?
                        )
                        UNION ALL
                        (
                            SELECT
                                m.member_idx AS memberIdx,
                                m.member_idx AS friendIdx,
                                COALESCE(s.score, 0) AS score
                            FROM member m
                            LEFT JOIN score s
                                ON s.member_idx = m.member_idx
                            WHERE m.member_idx BETWEEN ? AND ?
                        )
                        ORDER BY
                            memberIdx,
                            score DESC,
                            friendIdx
                        
            """;

    @StepScope
    @Bean
    public JdbcCursorItemReader<FriendScoreRow> friendScoreCursorReader(
            DataSource dataSource,
            @Value("#{stepExecutionContext['startIndex']}") Long startIndex,
            @Value("#{stepExecutionContext['endIndex']}") Long endIndex
    ){
        return new JdbcCursorItemReaderBuilder<FriendScoreRow>()
                .dataSource(dataSource)
                .name("friendScoreCursorReader")
                .sql(sql)
                .rowMapper((rs, rowNum) -> {
                    return FriendScoreRow.builder()
                            .friendIdx(rs.getLong("friendIdx"))
                            .memberIdx(rs.getLong("memberIdx"))
                            .score(rs.getInt("score"))
                            .build();
                })
                .preparedStatementSetter(ps -> {
                    ps.setLong(1, startIndex);
                    ps.setLong(2, endIndex);
                    ps.setLong(3, startIndex);
                    ps.setLong(4, endIndex);
                })
                .build();
    }
}

 

 

 

각 파티션에서 들고 있는 startIndex, endIndex를 기준으로 between을 통해 범위 스캔이 진행이되며, order by를 통해 정렬된 결과를 curosr로 스트리밍 받아 processor에서 랭킹을 연속적으로 계산하게 됩니다. 

 

 

 

 

 

왜 Processor에서 직접 dense rank를 계산하는가?

물론 rank와 같은 window function으로 db에서 바로 랭킹을 계산할 수도 있습니다. 하지만 아래와 같은 이유로 Processor에서 처리하는 방식을 선택했습니다. 

  • Mysql 버전에따라 대용량 window function 성능/메모리 사용이 불안정하다
  • 비즈니스 로직으로 동정 처리 방법등을 Java 코드에서 쉽게 바고 싶다.(query는 강결합되어 있음..)

때문에 Processor에서 직접 rank를 계산하는 쪽을 선택했습니다. 

 

 

 

@Builder
@Getter
public class FriendScoreRow {
    private Long memberIdx;
    private Long friendIdx;
    private Integer score;
}

 

@ToString
@Builder
@Getter
public class FriendRanking {
    private Long memberIdx;
    private Long friendIdx;
    private Integer score;
    private Integer ranking;
}

 

@StepScope
@Component
public class FriendRankingProcessor implements ItemProcessor<FriendScoreRow, FriendRanking> {

    private Long currentMemberIdx = null;
    private Integer currentRanking = null;
    private Integer currentScore = null;


    @Override
    public FriendRanking process(FriendScoreRow item) {
        Long memberIdx = item.getMemberIdx();
        ;
        Long friendIdx = item.getFriendIdx();
        ;

        //신규 진입 이거나 member가 바뀐 경우 랭킹 조기화 진행
        if (Objects.isNull(currentMemberIdx) || !currentMemberIdx.equals(memberIdx)) {
            currentMemberIdx = memberIdx;
            currentRanking = 1;
            currentScore = item.getScore();
        } else {
            if (!Objects.equals(currentScore, item.getScore())) {
                currentRanking++;
                currentScore = item.getScore();
            }
        }

        return FriendRanking.builder()
                .memberIdx(memberIdx)
                .friendIdx(item.getFriendIdx())
                .score(item.getScore())
                .ranking(currentRanking)
                .build();
    }
}

 

 

Processor에서 rank를 계산하기 위해서 현시점에 member_idx와 rank, score를 기록을 하는 저장하는 방식으로 랭킹을 계산했습니다. 만약 동점 처리 정책이 변경이 된다면 해당 부분에서 쉽게 변경 가능 할 것입니다. 

 

 

 

Writer에는 재실행을 통한 멱등성 보장 

재실행을 고려하면 아래와 같이 duplicate key update, ignore를 통해서 보장해줄 수 있습니다. 

 

INSERT INTO friend_rank (member_idx, friend_idx, score, ranking) 
VALUES (:memberIdx, :friendIdx, :score, :ranking)  
ON DUPLICATE KEY UPDATE
vp.score = new.score,
vp.ranking = new.ranking

 

하지만 duplicate key update 진행 했을 경우의 문제는 아래와 같습니다 .

 

 

[Dead Lock 이유]

파티션-a와 파티션-b로 동작하고 있다고 가정하고 각 파티션에서는 아래와 같은 동작을 할 것입니다. 

 

파티션-a : 

  • 1~1000번을 담당한다
  • 1000번을 진행한다. 
  • 유니크 제약 조건에 의해서 해당 레코드가 있는지 확인 하면서 1000번에 s-lock을 건다
  • 값이 없으면 값을 넣기 위해 1000번 s-lock에서 x-lock으로 변경 함과 동시에 Gap-Lock (1000~1001번) 을 획득하려 시도한다.

 

파티션-b : 

  • 1001~2000번을 담당한다
  • 1001번을 진행한다. 
  • 유니크 제약 조건에 의해서 해당 레코드가 있는지 확인 하면서 1001번에 s-lock을 건다
  • 값이 없으면 값을 넣기 위해 1001번 s-lock에서 x-lock으로 변경 함과 동시에 Gap-Lock (1000~1001번) 을 획득하려 시도한다.

 

 

결국 경계지점(1000,1001)에서는 insert를 진행하기 위해 gap락을 걸려다 보니 서로의 범위를 침범하게 되고 복잡한 락을 걸려고 시도하다가 Dead Lock이 발생한것입니다. 그래서 duplicate 방식이 아닌 delete 후 insert 하는 방식으로 변경을 했습니다

 

@Slf4j
@Component
@StepScope
@RequiredArgsConstructor
public class FriendRankDeletionWriter implements ItemWriter<FriendRanking> {

    private final JdbcTemplate jdbcTemplate;

    @Value("#{stepExecutionContext['startIndex']}")
    private Long startIndex;

    @Value("#{stepExecutionContext['endIndex']}")
    private Long endIndex;

    private boolean deleted = false;

    @Override
    public void write(Chunk<? extends FriendRanking> items) throws Exception {
        // 이 로직은 해당 Worker Step의 첫 번째 Chunk 트랜잭션에서만 실행되도록 보장해야 합니다.
        if (!deleted) {
            String sql = "DELETE FROM friend_rank WHERE member_idx BETWEEN ? AND ?";
            int count = jdbcTemplate.update(sql, startIndex, endIndex);

            // 첫 번째 Chunk에서만 삭제가 이루어지도록 플래그 설정
            deleted = true;
            log.info("Partition Range [{}, {}]: Deleted {} existing friend_rank records.",
                    startIndex, endIndex, count);
        }
    }
}
@Configuration
public class FriendRankWriterConfig {

    @Bean
    public JdbcBatchItemWriter<FriendRanking> friendRankJdbcBatchItemWriter(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<FriendRanking>()
                .dataSource(dataSource)
                .sql("""
                        INSERT INTO friend_rank (member_idx, friend_idx, score, ranking) 
                        VALUES (:memberIdx, :friendIdx, :score, :ranking)  
                        ON DUPLICATE KEY UPDATE
                         score = values(score),
                         ranking = values(ranking) 
                       
                        """)
                .beanMapped()
                .build();
    }

    @Bean
    public CompositeItemWriter<FriendRanking> compositeWriter(
            ItemWriter<FriendRanking> friendRankDeletionWriter,
            JdbcBatchItemWriter<FriendRanking> friendRankJdbcBatchItemWriter
    ) {
        CompositeItemWriter<FriendRanking> writer = new CompositeItemWriter<>();
        writer.setDelegates(List.of(
                friendRankDeletionWriter,
                friendRankJdbcBatchItemWriter
        ));
        return writer;
    }
}

 

 

 

 

 

그러면 왜 delete 후 insert는 dead lock에 걸리지 않을까요? 이유는 아래와 같습니다. 

  • delete 진행시 x-lock을 통해서 락을 건다. 
  • 파티션-a 의 경우 1~1000까지 x-lock을 건다
  • 파티션-a는 partion 범위에 따라 지정된 1~1000을 insert한다
  • 파티션-b 의 경우 1001~2000까지 x-lock을 건다
  • 파티션-b는 partion 범위에 따라 지정된 1001~2000을 insert 한다. 

즉 delete시 x-lock을 걸어서 확보를 해놓은 공간에 insert를 진행하기 때문에 dead lock이 발생하지 않습니다.

 

 

 

 

최종 Job에 대한 설정은?

 

그러면 이제 마지막으로 Job에 대해서 구성을 해보기 전에  위에서 "커넥션 풀과 parition 수를 잘 구성해야한다"  라고 했던 이유에 대해서 알아보려고 합니다. 

 

[이유]

  • JdbcCursorItemReader는 Step 전체 동안 커넥션 1개를 고정 점유합니다.
  • Partiton 수 N 만큼 최소 N개의 커넥션이 항상 사용중입니다. 
  • 기타 다른 step이나, writer에서는 커넥션이 별도로 필요 할 수 있습니다. 

 

특히 PagingReader의 경우 보통 chunk 단위로 트랜잭션을 시작을 해서 "DataSourceUtils.getConnection()" 을 통해서 동일 커넥션을 사용을 합니다. 하지만 JdbcCursorItemReader의 경우는 커서기반으로 동작하기 위해서 커넥션을 유지를 계속 해야 하며 트랜잭션 경계 밖에서 커넥션을 얻기 때문에 writer에서는 동일 커넥션을 공유하지 못해서 커넥션이 추가로 더 소요가 됩니다. 

 

 

 

예들들어 

  • Hikari maximimPoolSize = 20
  • gridSize = 5, ThreadPool = 5

 

인 경우  cursor가 5개 커넥션을 잡고 나머머지 15개로 다른작업을 처리할 수 있습니다. 반대로 "Hikari maximimPoolSize = 5" 에 "gridSize = 5" 면, 다른 벌써 cursor에서 5개를 사용해버려서 아래 에러가 발생하게 됩니다.

 

 

 

 

그래서 추천하는건 : 

  • Hikari maximumPoolSize ≥ partition(gridSize) + 여유(3~5)

로 설정하면 좀더 안정적으로 운영 할 수 있으실거라고 생가합니다. 그럼  job을 구성해보겠습니다. 

 

 

@Slf4j
@Configuration
@RequiredArgsConstructor
public class FriendRankingJobConfig {

    private final int CHUNK_SIZE = 2000;

    @Bean
    public Job friendRankingJob(
            JobRepository jobRepository,
            Step friendRankingMasterStep
    ) {
        return new JobBuilder("friendRankingJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(friendRankingMasterStep)
                .build();
    }


    @Bean
    public Step friendRankingMasterStep(
            JobRepository jobRepository,
            FriendRankingPartitioner friendRankingPartitioner,
            PartitionHandler friendRankingPartitionHandler
    ) {
        return new StepBuilder("friendRankingMasterStep", jobRepository)
                .partitioner("friendRankWorkerStep", friendRankingPartitioner)
                .partitionHandler(friendRankingPartitionHandler)
                .build();
    }

    @Bean
    public Step friendRankingWorkerStep(
            JobRepository jobRepository,
            PlatformTransactionManager platformTransactionManager,
            JdbcCursorItemReader<FriendScoreRow> friendScoreCursorReader,
            FriendRankingProcessor friendRankingProcessor,
            CompositeItemWriter<FriendRanking> compositeWriter

    ) {
        return new StepBuilder("friendRankingWorkerStep", jobRepository)
                .<FriendScoreRow, FriendRanking>chunk(CHUNK_SIZE, platformTransactionManager)
                .reader(friendScoreCursorReader)
                .processor(friendRankingProcessor)
                .writer(compositeWriter)
                .build();

    }

    @Bean
    public PartitionHandler friendRankingPartitionHandler(
            @Qualifier("friendRankingWorkerStep") Step friendRankingWorkerStep,
            @Qualifier("friendRankingExecutor") TaskExecutor friendRankingExecutor
    ) {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setStep(friendRankingWorkerStep);
        handler.setTaskExecutor(friendRankingExecutor);
        handler.setGridSize(5);
        return handler;
    }

    @Bean
    public TaskExecutor friendRankingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("friend-rank-");
        executor.initialize();
        return executor;
    }

   

}

 

 

 

 

저는 실제 이와 같은 방식으로 일일 1400만건 랭킹 배치를 설계 및 개발을 진행한적이 있습니다. 해당 작업을 진행하면서 겪었던 이슈를 한번 정리할겸 이렇게 작성을 하게되었습니다. 부족한점이 있지만 좋게 봐주셨으면 좋겠습니다.