Spring Batch - Rest API JOB 구성
프로젝트를 진행하면서 Batch를 구성해야하는 경우가 많은데요. 대부분 Job을 구성 할때 DB를 조회하는 JdbcItemReader, Jdbccursoritemreader, Jdbcpagingitemreader등을 이용해서 만드는 경우가 많았습니다. 간혹 저의 경우 외부 Data를 받기 위해 API를 호출하고 받은 Response Data를 저희 DB에 넣어서 동기화하는 Job을 구성해야 하는 경우가 있었는데요. 대부분은 DB에서 read해서 Db에 write를 하는 글은 많이 보이지만 외부 연동이 들어가는 Job은 어떻게 구성을 해야하는지 잘 보이지 않아 공부 할 겸 글로 작성해보려고 합니다. 틀린 부분 이 있을 수 있으니 감안해주시면 감사하겠습니다.
요구조건
- 외부 Rest API를 통해 데이터를 받아야한다.
- 해당 API는 페이징 기능이 들어가 있어 Page,size, 조회날짜 등을 파라미터로 받는다
- 파라미터로 날짜를 받고. 날짜 범위검색이 불가능하여 특정일만 조회가능하다
ex) 2022-01-01~2022-03-01 (x) - A날짜 ~ B날짜의 데이터를 받아올 수 있어야한다.
Flow
- JobParameter로 startDate, EndDate를 받는다.
- reader에서는 startDate ~ endDate까지 RestApi를 요청한다
- chunk size가 되면 writer를 진행한다
Job
위와같은 요구조건을 만족하는 Job이 있어야 하니 Job을 만들어 보겠습니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ExtRestApiJob {
private final String JOB_NAME = "EXTERNAL_REST_API_JOB";
private final String STEP_NAME = JOB_NAME + "STEP";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final RequestDateJobParameter requestDateJobParameter;
private final int chunkSize = 10;
@Bean(name = JOB_NAME)
public Job externalTradeJob() throws Exception {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.listener(new CustomJobListener())
.start(externalTradeStep())
.build();
}
@Bean(name = STEP_NAME)
public Step externalTradeStep() {
return stepBuilderFactory.get(STEP_NAME)
.<ExternalRestApiDto, ExternalRestApiDto>chunk(chunkSize)
.reader(externalRestApiReader())
.processor(externalRestApiProcessor())
.writer(externalRestApiWriter())
.build();
}
@StepScope
@Bean
public ExtRestApiReader<ExternalRestApiDto> externalRestApiReader() {
return new ExtRestApiReader<>(
ExternalRestApiDto.class,
requestDateJobParameter.getStartDate(),
requestDateJobParameter.getEndDate(),
1);
}
@Bean
ItemProcessor<ExternalRestApiDto, ExternalRestApiDto> externalRestApiProcessor() {
return item -> {
log.info("item : {}" , item);
return item;
};
}
@Bean
public ItemWriter<ExternalRestApiDto> externalRestApiWriter() {
String sql =
"INSERT external_table(\n" +
"name,\n" +
"age\n" +
") values (\n" +
":name,\n" +
":age\n" +
")\n" +
"ON DUPLICATE KEY UPDATE\n" +
"name = :name,\n" +
"age = :age";
return new JdbcBatchItemWriterBuilder<ExternalRestApiDto>()
.dataSource(dataSource)
.beanMapped()
.sql(sql)
.build();
}
}
위의 Job은 chunk 단위로 동작을 합니다. reader를 통해 데이터를 가져오고 writer를 통해 chunk 단위로 Insert를 진행합니다. 대부분은 비슷한데 저는 RequestDataJobParmeter, ExtRestApiReader를 추가 했습니다.
RequestDataJobParmeter
우선 RequestDataJobParmeter는 JobParameter로 받은 String 타입의 date를 LocalDate로 변환하고 그걸 Job에서 사용하기 하기 위함입니다. startDate는 시작날짜 이고 endDate는 종료 날짜 입니다. 때문에 JOB을 실행시킬때 JobParameter로 startDate,endDate를 넘겨 startDate~endDate 기간의 Data를 가져와 write를 하게 됩니다.
@Slf4j
@Getter
@NoArgsConstructor
public class RequestDateJobParameter {
private LocalDate startDate;
private LocalDate endDate;
//시작날짜
@Value("#{jobParameters[startDate]}")
public void setStartDate(String startDate) {
this.startDate = StringUtils.hasText(startDate) ?
LocalDate.parse(startDate, DateTimeFormatter.ofPattern("yyyy-MM-dd")) : LocalDate.now();
}
//종료날짜
@Value("#{jobParameters[endDate]}")
public void setEndDate(String endDate) {
this.endDate = StringUtils.hasText(endDate) ?
LocalDate.parse(endDate, DateTimeFormatter.ofPattern("yyyy-MM-dd")) : LocalDate.now();
}
}
ExtRestApiReader
ExtRestApiReader은 Custom한 Reader로 파라미터로 외부API를 통해 응답받을 ResponseDto와 , 조회기간인 startDate , endDate, 페이지의 단위인 PAZE_SIZE를 받고 있습니다. 자세한 내용은 아래의 Reader를 구성하는 부분에 설명드리겠습니다.
public class ExternalRestApiResult<T> {
private String resultCode;
private List<T> data;
public ExternalRestApiResult(String resultCode, List<T> data) {
this.resultCode = resultCode;
this.data = data;
}
}
public class ExternalRestApiDto {
private String name;
private int age;
}
ExternalRestApiResult는 resultCode는 결과에 따른 성공, 실패 응답 값을 의미하고 data는 list 결과값을 의미합니다.
writer에서는 ExternalRestApiDto를 가지고 insert를 진행합니다.
ExternalRestApiWriter
wrtier 부분은 중복이 일어나지 않게 Step Flow를 구성할 수 있지만 upsert를 진행해서 해결하는걸로 진행했습니다.
그러면 이젠 가장 중요한 Reader를 만들겠습니다.
Reader
요구조건을 만족해야 하기 때문에 흔히 쓰는 Jdbccursoritemreader, Jdbcpagingitemreader은 사용하지 않고 별도 reader를 만들어야 합니다.
ExtRestApiReader는 ItemReder를 구현한 구현체 클래스 입니다. ItemStreamReder를 상속 받고 실패한 시점부터 재실행 가능하게 할 수 있지만 해당 기능은 ItemReader로도 충분하다고 판단했습니다. ItemStreamReder은 어떻게 사용 할 수 있는 지는 다음에 기회가 있으면 작성하겠습니다.
우선 ExtRestApiReader는 아래와 같은 생성자를 통해 생성이 되는데요. clazz는 외부 RestApi Response를 받는 Class의 타입을 의미하고 나머지는 RestApi 호출에 필요한 날짜, 페이지의 사이즈를 의미합니다. 날짜 파리미터인 START_DATE, END_DATE는 특정 날짜만 받는 API를 범위 검색으로 요청하여 Response 받기 위해 사용했습니다.
@Slf4j
public class ExtRestApiReader<T> implements ItemReader<T> {
private final int PAGE_SIZE;
private final LocalDate START_DATE;
private final LocalDate END_DATE;
private final Class<T> clazz;
private int currentIdx;
private int currentPage = 1;
private List<T> readData = new ArrayList<>();
private LocalDate currentDate;
private Integer excuteCount = 0;
private final Integer FINISH_COUNT = 500;
public ExtRestApiReader(Class<T> clazz, LocalDate START_DATE, LocalDate END_DATE, int PAGE_SIZE) {
this.START_DATE = START_DATE;
this.END_DATE = END_DATE;
this.currentDate = START_DATE;
this.PAGE_SIZE = PAGE_SIZE;
this.clazz = clazz;
}
@Override
public T read() throws Exception {
if (readData.isEmpty() || currentIdx >= readData.size()) {
fetchNextPage();
}
if (readData.isEmpty() && this.currentDate.isBefore(END_DATE)) {
fetchNextDateOrEndDate();
}
if (!this.readData.isEmpty() && currentIdx < readData.size()) {
return readData.get(currentIdx++);
}
return null;
}
/**
* 다음 날짜를 조회하는데 데이터가 없으면 마지막 날까지 반복 수행하고
* 마지막까지 없으면 빈 배열 리턴 하고 currentPage + 1 진행
*
* @throws JsonProcessingException
*/
private void fetchNextDateOrEndDate() throws JsonProcessingException {
List<T> result = fetchNextDate();
while (result.isEmpty() && (this.currentDate.isBefore(END_DATE) || this.currentDate.isEqual(END_DATE))) {
result = fetchNextDate();
}
this.readData = result;
updateInitIdxAndNextPage();
}
/**
* currentPage 와 currentIdx를 초기화 하고 다음 날짜로 변경하여
* 다음 날짜의 데이터 요청
*
* @return
* @throws JsonProcessingException
*/
private List<T> fetchNextDate() throws JsonProcessingException {
updateInitPageAndNextDate();
return fetchApiData();
}
private void updateInitPageAndNextDate() {
initPage();
this.currentDate = this.currentDate.plusDays(1);
}
/**
* 다음 페이지 조회
*
* @throws JsonProcessingException
*/
private void fetchNextPage() throws JsonProcessingException {
this.readData = fetchApiData();
updateInitIdxAndNextPage();
}
private void updateInitIdxAndNextPage() {
this.currentIdx = 0;
this.currentPage++;
}
public List<T> fetchApiData() throws JsonProcessingException {
checkLoop();
return getExternalData(currentDate);
}
//외부 API 호출
private List<T> getExternalData(LocalDate currentDate) throws JsonProcessingException {
ExternalReq req = ExternalReq.builder().date(
currentDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.page(currentPage)
.pageSize(PAGE_SIZE)
.build();
log.info("[req data] : {}", req);
ExternalRestApiResult<T> res = new ObjectMapper().readValue(
getRestApiData(req),
new ObjectMapper().getTypeFactory().constructParametricType(ExternalRestApiResult.class, clazz));
return Optional.ofNullable(res).map(ExternalRestApiResult::getData).orElse(new ArrayList<>());
}
private void checkLoop() {
if (excuteCount++ > FINISH_COUNT) {
throw new RuntimeException("무한 루프 판단 Exception!!");
}
}
private void nextPage() {
this.currentPage++;
}
private void initPage() {
this.currentPage = 1;
}
public String getRestApiData(ExternalReq req) throws JsonProcessingException {
List<ExternalRestApiDto> data = null;
if(req.getDate().equals("2023-03-01")) {
if (currentPage == 1) {
data = IntStream.rangeClosed(1, 5).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
} else if (currentPage == 2) {
data = IntStream.rangeClosed(6, 8).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
} else {
data = new ArrayList<>();
}
}
else if(req.getDate().equals("2023-03-02")) {
data = new ArrayList<>();
}
else {
if (currentPage == 1) {
data = IntStream.rangeClosed(16, 20).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
}
else if (currentPage == 2) {
data = IntStream.rangeClosed(21, 28).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
}
else {
data = new ArrayList<>();
}
}
ExternalRestApiResult<ExternalRestApiDto> result = new ExternalRestApiResult<>("200", data);
return new ObjectMapper().writeValueAsString(result);
}
}
@ToString
@Builder
@Getter
public class ExternalReq {
private Integer pageSize;
private Integer page;
private String date;
}
인스턴스의 변수의 의미를 설명 드리자면 아래와 같습니다.
PAGE_SIZE | 페이지 단위 |
START_DATE | 시작날짜 |
END_DATE | 종료날짜 |
clazz | 제네릭으로 타입을 받기 위한 클래스정보 |
currentIdx | 외부 API로 부터 응답받은 readData를 read()에서 하나씩 읽어 return 하는데 해당 시점의 readData의 index정보 |
cuurentPage | 외부 API를 호출 하는 시점의 페이지 번호 |
readData | 외부 API로 부터 응답은 데이터 리스트 |
excuteCount | 외부 API 호출 카운트 |
FINISH_COUNT | 외부 API 호출 카운트가 너무 많으면 무한루프에 빠졌다고 판단하여 종료를 하기위한 호출 최대회수 |
여기서 가장 중요한 부분은 read() 일꺼 같은데요. Spring Batch는 ItemReader의 read()를 호출하여 데이터를 하나씩 가져옵니다. 이때 chunk size와 같아지면 해당 list를 processor, writer에게 넘기면서 chunk 단위로 트랜잭션이 이루어질 수가 있는건데요. 때문에 read부분을 어떻게 구성하냐가 reader를 만드는데 가장 중요하다고 저는 생각합니다. 그래서 저는 아래와 같이 구성했습니다.
- 1번은 첫 호출이거나 해당 restData의 마지막 Idx를 return한 이후에 fectNextPage()를 통해 다음페이지를 조회를 합니다.
- 1번을 통한 결과가 빈리스트이면서 마지막날짜가 아니면 2번을 통해 해당 1번에서 조회한 날짜의 다음 날짜를 조회합니다.
- 2번의결과가 빈리스트이면 null을 리턴하여 해당 Step의 종료를 알리고 조회결과 데이터가 존재하면 응답 받은 해당 readData에서 currentIdx를 통해 객체를 하나를 리턴하고 이것을 반복합니다.
- chunk 사이즈가 만들어지면 processor, writer에게 전달합니다.
1번의 다음페이지를 조회하는 코드는 아래와 같습니다. fecthNextPage를 호출 할때 마다 currentIdx를 0으로 바꾸고 currentPage를 +1를 진행합니다. 그래야 다음 페이지를 조회 할 수 있고 currentIdx를 0으로 함으로써 다시 readData의 처음 index부터 return 할 수 있기 때문입니다.
checkLoop를 통해 너무 과도한 요청을 차단을 막기위한 방어 로직을 구성하였습니다.
2번의 다음 날짜를 조회하는 코드는 아래와 같습니다. 다음 날짜를 조회하기전 currentPage를 초기화하고 currentDate + 1하여 외부 API를 호출합니다. 그래야 다음날짜의 1번페이지부터 조회가 가능하기때문입니다. 그럼에도 결과가 없으면 while문을 통해 마지막날짜이면서 응답 결과에 데이터가 있을때 까지 다음날짜를 반복 조회합니다. 마지막 날짜가 되었음에도 없으면 currentIdx를 0으로 만들고 currentPage를 +1 하여 초기화 진행 후 빈리스트를 반환을 합니다. 초기화를 하는 이유는 결과값이 있을때 1번을 통해 다음 날짜의 다음페이지를 조회하기 위함입니다.
외부 Api를 호출하는 부분은 WebClient, RestTemplete을 통해 구현을 하면 되지만 해당부분은 임시코드로 대체하여 진행했습니다. 2023-03-01~2023-03-03까지 조회 결과를 아래와 같은 형태로 구성을 했습니다. 아래에 의미는 빈리스트를 각 날짜에 넣어 테스트하기 위함입니다.
public String getRestApiData(ExternalReq req) throws JsonProcessingException {
List<ExternalRestApiDto> data = null;
if(req.getDate().equals("2023-03-01")) {
if (currentPage == 1) {
data = IntStream.rangeClosed(1, 5).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
} else if (currentPage == 2) {
data = IntStream.rangeClosed(6, 8).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
} else {
data = new ArrayList<>();
}
}
else if(req.getDate().equals("2023-03-02")) {
data = new ArrayList<>();
}
else {
if (currentPage == 1) {
data = IntStream.rangeClosed(16, 20).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
}
else if (currentPage == 2) {
data = IntStream.rangeClosed(21, 28).mapToObj(i -> ExternalRestApiDto.builder().age(i).name("pooney" + i).build()).collect(Collectors.toList());
}
else {
data = new ArrayList<>();
}
}
ExternalRestApiResult<ExternalRestApiDto> result = new ExternalRestApiResult<>("200", data);
return new ObjectMapper().writeValueAsString(result);
}
그러면 아래와 같이 JobParameter를 넘기고 디버깅을 해보겠습니다.
statdDate에는 2023-03-01 , endDate에는 2023-03-03이 정상적으로 들어가 있고 첫 번재 조회 이기때문에 첫 번째 if문을 타는것을 볼 수 있습니다.
첫번째 조회가 결과가 pooney1 ~ pooney5 까지 응답을 받았기때문에 3번째 if문을 타고 하나씩 객체를 리턴하게 됩니다.
한 턴이 지나고 currentIdx가 0 -> 1 로 변경 된것을 확인 할 수 있습니다.
currentDate = 2023-03-01 , currentPage = 4, readData = 0 임으로 해당 날짜는 더이상 데이터 없다 판단하여 2번 if을 타고 다음 날짜를 조회하게 됩니다.
2023-03-02에는 데이터가 없기 때문에 마지막 날짜인 2023-03-03을 조회하고 해당 Job은 끝이납니다.
그러면 최종적으로 데이터는 정상적으로 다 들어가 있는지 확인 보겠습니다. DB에 pooney1 ~pooney28까지 쌓여 있는지 확인 해보겠습니다.
성공적으로 쌓여있는걸 확인 할 수 있었습니다. 해당 Job 구성하게 된 이유는 정산을 위해 결제데이터를 가져와야 하는데 해당 API가 특정 날짜만 요청 가능한 형태로 구성되어 있어서 단건으로 조회하여 가져오는건 아무래도 리소스 낭비라고 판단했습니다. 편하게 한다면 단건으로 JOB을 돌려서 구성 할 수 있으나 커넥션, 오베헤드등 특히 운영에서는 문제가 많아 좋지 않다고 판단하였고 범위검색으로 모든 데이터를 저희 쪽 DB에 동기화하는 Job을 만들어야 겠다고 판단했습니다. 하지만 대부분 찾아보면 API 호출 형태가 아니라 DB중심으로 이루져있고 있어도 기간범위 검색, 페이징을 구현하는 글을 찾기가 힘들어 열심히 공부하면서 만들었습니다. 부족한 점이 많지만 좋게 봐주셨으면 좋겠습니다. 감사합니다.