[Spring-Batch] 대용량 랭킹 배치 설계
안녕하세요 오늘은 대용량 랭킹 배치에 대해서 알아보려고 합니다. 해당 랭킹 배치는 진행하던 프로젝트에서 경험 한 내용으로 제가 생각하던 것을 공유 하고 싶어 진행하게 되었습니다.
제가 진행하던 랭킹은 나와 내친구들을 기준으로 하는 개인화 랭킹이였습니다. 해당 기능은 아래와 같은 조건이 있습니다.
- 회원 : 약 390만
- 친구 관계 : 회원마다 친구 수가 제각각 (특정 회원의 경우 친구가 3만명)
- 친구 테이블 수 : 본인 * 친구들 = 약 1400만(?)
- 본인 + 친구들 집하에 대해 랭킹을 계산해야함
- 랭킹 결과를 테이블에 적재한다.
해당 조건을 기준으로 "대용량 친구 랭킹 배치" 를 어떻게 설계하면 안전하고 성능 이슈 없이 돌아가는지 정리해보겠습니다.
스키마 설계

-- 회원
create table member
(
member_idx int unsigned auto_increment
primary key,
name varchar(50) not null,
created_at datetime default CURRENT_TIMESTAMP not null
);
-- 친구
create table friend
(
idx int unsigned auto_increment
primary key,
member_idx int unsigned not null,
friend_idx int unsigned not null,
created_at datetime default CURRENT_TIMESTAMP not null,
constraint uk_friend
unique (member_idx, friend_idx)
);
create index idx_friend_friend
on friend (friend_idx);
create index idx_friend_member
on friend (member_idx);
-- 친구 랭킹
create table friend_rank
(
idx int unsigned auto_increment
primary key,
member_idx int unsigned not null,
friend_idx int unsigned not null,
score int not null,
ranking int not null,
updated_at datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP,
constraint friend_rank_pk
unique (member_idx, friend_idx)
);
create index idx_rank_member
on friend_rank (member_idx);
create index idx_rank_member_ranking
on friend_rank (member_idx, ranking);
-- 스코어
create table score
(
idx int unsigned auto_increment
primary key,
member_idx bigint unsigned not null,
score int default 0 not null,
updated_at datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP
);
create index idx_score_member
on score (member_idx);
[조건]
비슷한 환경을 구성하기 위해 다음을 전제로 진행했습니다.
- friend 테이블에는 친구 관계만 존재한다
- 본인은 friend 테이블에는 존재 하지 않는다.
그래서 랭킹을 계산할떄는 :
- 나 + 내친구들 집합에 대해 랭킹을 계산해야 함으로 friend + member(자기자신)을 합쳐서 처리한다.
"friend" 테이블에 자기 자신 row를 넣어주면 설계가 훨씬 깔끔하지만 운영 중인 스키마를 기준으로 설계를 진행했습니다.
여기서 "본인을 포함한 랭킹" 을 어떻게 만들까?
friend 테이블에는 본인이 없기 때문에, 각 회원의 개인화 랭킹은 아래와 같이 구성하려고 합니다

[의도]
- friend 테이블에는 친구만 있으므로, 별도의 select로 본인을 합쳐야한다.
- UNION ALL을 사용해서 "친구 스코어 + 본인스코어"를 하나의 스트림으로 만든다
- member_idx와 범위 조건 between에 인덱스가 있으면 범위 스캔으로 처리된다.
병렬을 위한 Partitioner 사용 이유
크게 병렬로 돌릴 수 있는 방법은 Multi-threaded Step 과 Partitioner가 존재합니다. 그중 partioner을 사용한 이유는 정리하면 이렇습니다.
- 회원 390만명을 단일 Step, 단일 thread로 처리하면 시간이 너무 오래 걸린다.
- member_idx는 PK 이므로, 범위를 잘 쪼개기 쉽다.
- 따라서 member_idx 범위를 파티션으로 나누고, 각 파티션을 병렬로 처리하면 전체 소요시간을 줄일 수 있다.
@Component
@RequiredArgsConstructor
public class FriendRankingPartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> executionContextMap = new HashMap<>();
String sql = """
select MIN(member_idx) as minMemberIndex, MAX(member_idx) as maxMemberIndex from friend
""";
FriendMinMaxDto friendMinMaxDto = jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(FriendMinMaxDto.class))
.stream()
.findFirst()
.orElse(null);
if (Objects.isNull(friendMinMaxDto) || Objects.isNull(friendMinMaxDto.minMemberIndex) || Objects.isNull(friendMinMaxDto.maxMemberIndex)) {
return executionContextMap;
}
// 나누려는 개수
long size = (friendMinMaxDto.maxMemberIndex - friendMinMaxDto.minMemberIndex) / gridSize + 1;
// 시작 index
long start = friendMinMaxDto.getMinMemberIndex();
// 종료 index
long end = start + size - 1;
for (int i = 0; i < gridSize; i++) {
// 불필요한 파티션 생성을 막기 위함
if (start > friendMinMaxDto.maxMemberIndex) break;
// 마지막의 경우 마지막 index로 end를 설정하기 위함
if (end > friendMinMaxDto.maxMemberIndex) {
end = friendMinMaxDto.maxMemberIndex;
}
ExecutionContext executionContext = new ExecutionContext();
executionContext.putLong("startIndex", start);
executionContext.putLong("endIndex", end);
executionContextMap.put("partition" + i, executionContext);
start = end + 1;
end = start + size - 1;
}
return executionContextMap;
}
@Setter
@Getter
public static class FriendMinMaxDto {
private Integer minMemberIndex;
private Integer maxMemberIndex;
}
}
이렇게 하면 390만(?)명을 N개의 파티션으로 나누어서, 각 step은 390만/N + 1로 처리를 진행하게 됩니다.
Reader로 Cursor를 사용한 이유
크게 보면 reader는 paging 과 cursor가 존재합니다. 그중 저는 cursor를 선택했는데 이유를 정리하면 이렇습니다.
- 랭킹 계산은 member_idx + score DESC 기준으로 정렬된 데이터를 스트림으로 처리하는 게 가장 깔끔하다.
- Cursor 기반으로 한 번 정렬된 결과를 받아서, Processor에서 dense rank를 계산하기 쉽다.
- Paging(OFFSET/LIMIT)으로 랭킹을 나누면 페이징 키 / 중복 처리 / 정렬 안정성 이슈를 신경 써야 한다
물론 단점도 존재합니다 단점은 이렇습니다
- 각 Step이 커넥션을 Step 전체 동안 점유하기 때문에
- Partition 수 > 커넥션 풀 크기이면 바로 connection timeout이 난다.
때문에 저는 정렬된 스트림을 한번에 처리하고 싶었기 때문에 cursor를 선택하였습니다. 물론 이경우 신경 써야 하는 부분은 커넥션 풀과 parition 수를 잘 맞춰서 진행해야 합니다. 해당 풀에 대한 내용은 아래에서 정리 하기로 하고 우선 reader 부분을 보겠습니다.
@Configuration
public class FriendRankingReaderConfig {
private final static String sql = """
(
SELECT
f.member_idx AS memberIdx,
f.friend_idx AS friendIdx,
COALESCE(s.score, 0) AS score
FROM friend f
LEFT JOIN score s
ON s.member_idx = f.friend_idx
WHERE f.member_idx BETWEEN ? AND ?
)
UNION ALL
(
SELECT
m.member_idx AS memberIdx,
m.member_idx AS friendIdx,
COALESCE(s.score, 0) AS score
FROM member m
LEFT JOIN score s
ON s.member_idx = m.member_idx
WHERE m.member_idx BETWEEN ? AND ?
)
ORDER BY
memberIdx,
score DESC,
friendIdx
""";
@StepScope
@Bean
public JdbcCursorItemReader<FriendScoreRow> friendScoreCursorReader(
DataSource dataSource,
@Value("#{stepExecutionContext['startIndex']}") Long startIndex,
@Value("#{stepExecutionContext['endIndex']}") Long endIndex
){
return new JdbcCursorItemReaderBuilder<FriendScoreRow>()
.dataSource(dataSource)
.name("friendScoreCursorReader")
.sql(sql)
.rowMapper((rs, rowNum) -> {
return FriendScoreRow.builder()
.friendIdx(rs.getLong("friendIdx"))
.memberIdx(rs.getLong("memberIdx"))
.score(rs.getInt("score"))
.build();
})
.preparedStatementSetter(ps -> {
ps.setLong(1, startIndex);
ps.setLong(2, endIndex);
ps.setLong(3, startIndex);
ps.setLong(4, endIndex);
})
.build();
}
}
각 파티션에서 들고 있는 startIndex, endIndex를 기준으로 between을 통해 범위 스캔이 진행이되며, order by를 통해 정렬된 결과를 curosr로 스트리밍 받아 processor에서 랭킹을 연속적으로 계산하게 됩니다.
왜 Processor에서 직접 dense rank를 계산하는가?
물론 rank와 같은 window function으로 db에서 바로 랭킹을 계산할 수도 있습니다. 하지만 아래와 같은 이유로 Processor에서 처리하는 방식을 선택했습니다.
- Mysql 버전에따라 대용량 window function 성능/메모리 사용이 불안정하다
- 비즈니스 로직으로 동정 처리 방법등을 Java 코드에서 쉽게 바고 싶다.(query는 강결합되어 있음..)
때문에 Processor에서 직접 rank를 계산하는 쪽을 선택했습니다.
@Builder
@Getter
public class FriendScoreRow {
private Long memberIdx;
private Long friendIdx;
private Integer score;
}
@ToString
@Builder
@Getter
public class FriendRanking {
private Long memberIdx;
private Long friendIdx;
private Integer score;
private Integer ranking;
}
@StepScope
@Component
public class FriendRankingProcessor implements ItemProcessor<FriendScoreRow, FriendRanking> {
private Long currentMemberIdx = null;
private Integer currentRanking = null;
private Integer currentScore = null;
@Override
public FriendRanking process(FriendScoreRow item) {
Long memberIdx = item.getMemberIdx();
;
Long friendIdx = item.getFriendIdx();
;
//신규 진입 이거나 member가 바뀐 경우 랭킹 조기화 진행
if (Objects.isNull(currentMemberIdx) || !currentMemberIdx.equals(memberIdx)) {
currentMemberIdx = memberIdx;
currentRanking = 1;
currentScore = item.getScore();
} else {
if (!Objects.equals(currentScore, item.getScore())) {
currentRanking++;
currentScore = item.getScore();
}
}
return FriendRanking.builder()
.memberIdx(memberIdx)
.friendIdx(item.getFriendIdx())
.score(item.getScore())
.ranking(currentRanking)
.build();
}
}
Processor에서 rank를 계산하기 위해서 현시점에 member_idx와 rank, score를 기록을 하는 저장하는 방식으로 랭킹을 계산했습니다. 만약 동점 처리 정책이 변경이 된다면 해당 부분에서 쉽게 변경 가능 할 것입니다.
Writer에는 재실행을 통한 멱등성 보장
재실행을 고려하면 아래와 같이 duplicate key update, ignore를 통해서 보장해줄 수 있습니다.
INSERT INTO friend_rank (member_idx, friend_idx, score, ranking)
VALUES (:memberIdx, :friendIdx, :score, :ranking)
ON DUPLICATE KEY UPDATE
vp.score = new.score,
vp.ranking = new.ranking
하지만 duplicate key update 진행 했을 경우의 문제는 아래와 같습니다 .

[Dead Lock 이유]
파티션-a와 파티션-b로 동작하고 있다고 가정하고 각 파티션에서는 아래와 같은 동작을 할 것입니다.
파티션-a :
- 1~1000번을 담당한다
- 1000번을 진행한다.
- 유니크 제약 조건에 의해서 해당 레코드가 있는지 확인 하면서 1000번에 s-lock을 건다
- 값이 없으면 값을 넣기 위해 1000번 s-lock에서 x-lock으로 변경 함과 동시에 Gap-Lock (1000~1001번) 을 획득하려 시도한다.
파티션-b :
- 1001~2000번을 담당한다
- 1001번을 진행한다.
- 유니크 제약 조건에 의해서 해당 레코드가 있는지 확인 하면서 1001번에 s-lock을 건다
- 값이 없으면 값을 넣기 위해 1001번 s-lock에서 x-lock으로 변경 함과 동시에 Gap-Lock (1000~1001번) 을 획득하려 시도한다.
결국 경계지점(1000,1001)에서는 insert를 진행하기 위해 gap락을 걸려다 보니 서로의 범위를 침범하게 되고 복잡한 락을 걸려고 시도하다가 Dead Lock이 발생한것입니다. 그래서 duplicate 방식이 아닌 delete 후 insert 하는 방식으로 변경을 했습니다.
@Slf4j
@Component
@StepScope
@RequiredArgsConstructor
public class FriendRankDeletionWriter implements ItemWriter<FriendRanking> {
private final JdbcTemplate jdbcTemplate;
@Value("#{stepExecutionContext['startIndex']}")
private Long startIndex;
@Value("#{stepExecutionContext['endIndex']}")
private Long endIndex;
private boolean deleted = false;
@Override
public void write(Chunk<? extends FriendRanking> items) throws Exception {
// 이 로직은 해당 Worker Step의 첫 번째 Chunk 트랜잭션에서만 실행되도록 보장해야 합니다.
if (!deleted) {
String sql = "DELETE FROM friend_rank WHERE member_idx BETWEEN ? AND ?";
int count = jdbcTemplate.update(sql, startIndex, endIndex);
// 첫 번째 Chunk에서만 삭제가 이루어지도록 플래그 설정
deleted = true;
log.info("Partition Range [{}, {}]: Deleted {} existing friend_rank records.",
startIndex, endIndex, count);
}
}
}
@Configuration
public class FriendRankWriterConfig {
@Bean
public JdbcBatchItemWriter<FriendRanking> friendRankJdbcBatchItemWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<FriendRanking>()
.dataSource(dataSource)
.sql("""
INSERT INTO friend_rank (member_idx, friend_idx, score, ranking)
VALUES (:memberIdx, :friendIdx, :score, :ranking)
ON DUPLICATE KEY UPDATE
score = values(score),
ranking = values(ranking)
""")
.beanMapped()
.build();
}
@Bean
public CompositeItemWriter<FriendRanking> compositeWriter(
ItemWriter<FriendRanking> friendRankDeletionWriter,
JdbcBatchItemWriter<FriendRanking> friendRankJdbcBatchItemWriter
) {
CompositeItemWriter<FriendRanking> writer = new CompositeItemWriter<>();
writer.setDelegates(List.of(
friendRankDeletionWriter,
friendRankJdbcBatchItemWriter
));
return writer;
}
}
그러면 왜 delete 후 insert는 dead lock에 걸리지 않을까요? 이유는 아래와 같습니다.
- delete 진행시 x-lock을 통해서 락을 건다.
- 파티션-a 의 경우 1~1000까지 x-lock을 건다
- 파티션-a는 partion 범위에 따라 지정된 1~1000을 insert한다
- 파티션-b 의 경우 1001~2000까지 x-lock을 건다
- 파티션-b는 partion 범위에 따라 지정된 1001~2000을 insert 한다.
즉 delete시 x-lock을 걸어서 확보를 해놓은 공간에 insert를 진행하기 때문에 dead lock이 발생하지 않습니다.
최종 Job에 대한 설정은?
그러면 이제 마지막으로 Job에 대해서 구성을 해보기 전에 위에서 "커넥션 풀과 parition 수를 잘 구성해야한다" 라고 했던 이유에 대해서 알아보려고 합니다.
[이유]
- JdbcCursorItemReader는 Step 전체 동안 커넥션 1개를 고정 점유합니다.
- Partiton 수 N 만큼 최소 N개의 커넥션이 항상 사용중입니다.
- 기타 다른 step이나, writer에서는 커넥션이 별도로 필요 할 수 있습니다.
특히 PagingReader의 경우 보통 chunk 단위로 트랜잭션을 시작을 해서 "DataSourceUtils.getConnection()" 을 통해서 동일 커넥션을 사용을 합니다. 하지만 JdbcCursorItemReader의 경우는 커서기반으로 동작하기 위해서 커넥션을 유지를 계속 해야 하며 트랜잭션 경계 밖에서 커넥션을 얻기 때문에 writer에서는 동일 커넥션을 공유하지 못해서 커넥션이 추가로 더 소요가 됩니다.
예들들어
- Hikari maximimPoolSize = 20
- gridSize = 5, ThreadPool = 5
인 경우 cursor가 5개 커넥션을 잡고 나머머지 15개로 다른작업을 처리할 수 있습니다. 반대로 "Hikari maximimPoolSize = 5" 에 "gridSize = 5" 면, 다른 벌써 cursor에서 5개를 사용해버려서 아래 에러가 발생하게 됩니다.

그래서 추천하는건 :
- Hikari maximumPoolSize ≥ partition(gridSize) + 여유(3~5)
로 설정하면 좀더 안정적으로 운영 할 수 있으실거라고 생가합니다. 그럼 job을 구성해보겠습니다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class FriendRankingJobConfig {
private final int CHUNK_SIZE = 2000;
@Bean
public Job friendRankingJob(
JobRepository jobRepository,
Step friendRankingMasterStep
) {
return new JobBuilder("friendRankingJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(friendRankingMasterStep)
.build();
}
@Bean
public Step friendRankingMasterStep(
JobRepository jobRepository,
FriendRankingPartitioner friendRankingPartitioner,
PartitionHandler friendRankingPartitionHandler
) {
return new StepBuilder("friendRankingMasterStep", jobRepository)
.partitioner("friendRankWorkerStep", friendRankingPartitioner)
.partitionHandler(friendRankingPartitionHandler)
.build();
}
@Bean
public Step friendRankingWorkerStep(
JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
JdbcCursorItemReader<FriendScoreRow> friendScoreCursorReader,
FriendRankingProcessor friendRankingProcessor,
CompositeItemWriter<FriendRanking> compositeWriter
) {
return new StepBuilder("friendRankingWorkerStep", jobRepository)
.<FriendScoreRow, FriendRanking>chunk(CHUNK_SIZE, platformTransactionManager)
.reader(friendScoreCursorReader)
.processor(friendRankingProcessor)
.writer(compositeWriter)
.build();
}
@Bean
public PartitionHandler friendRankingPartitionHandler(
@Qualifier("friendRankingWorkerStep") Step friendRankingWorkerStep,
@Qualifier("friendRankingExecutor") TaskExecutor friendRankingExecutor
) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setStep(friendRankingWorkerStep);
handler.setTaskExecutor(friendRankingExecutor);
handler.setGridSize(5);
return handler;
}
@Bean
public TaskExecutor friendRankingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("friend-rank-");
executor.initialize();
return executor;
}
}
저는 실제 이와 같은 방식으로 일일 1400만건 랭킹 배치를 설계 및 개발을 진행한적이 있습니다. 해당 작업을 진행하면서 겪었던 이슈를 한번 정리할겸 이렇게 작성을 하게되었습니다. 부족한점이 있지만 좋게 봐주셨으면 좋겠습니다.