스프링 배치 도입 이유
- 전체 데이터가 트랜잭션 범위이다 보니 관리가 어려움
- 에러 발생 시 전체 롤백..
- 데이터가 많아질 수록 더 효율적인 방법들이 필요함. @Schedule 안에서 모든 작업을 처리하는 방식은 너무 무대뽀임.
- 병렬 처리
- 분할 처리
=> Chunk Oriented 프로세싱을 지향하는 스프링 배치는 좋은 선택지가 될 것이라고 생각됨. 청크 기반으로 트랜잭션 단위를 분할하고, 멀티 스레드 기법을 통해 더 나은 성능 및 효율로 데이터를 처리할 수 있음.
배치 설계
- 일간 투두 성취 누적 포인트 반영 배치 설계
- 정의: 매일 모든 유저들의 전날 투두 성취를 저장하고, 포인트를 집계하여 반영한다.
- 배치 주기: 매일 새벽 1시 (트래픽이 적은 시기)
- 2개의 Step으로 구성:
Member 상태 업데이트 step
->TodoAchievementHistory 생성 step
updateMemberStep
- Reader: 멤버 테이블에서 dailyAcheivement가 0보다 큰 모든 멤버 정렬하여 조회
- Writer: 멤버의 dailyAchievementCnt를 참조하여 ⭐️추가 + dailyAchievementCnt를 0으로 초기화 + scheduledReward를 0으로 초기화
createTodoAchievementHistoryStep
- Reader: 멤버 테이블에서 dailyAcheivement가 0보다 큰 모든 멤버 정렬하여 조회 (이때, executionContext에 데이터가 있다면 사용)
- Processor: TodoAchievementHistory 객체 생성
- Writer: TodoAchievementHistory 저장
- 일정 알림 배치
- 정의: 유저가 생성한 일정 30분 전에 알림을 보낸다.
- 배치 주기: 1분 마다?
- Reader: 투두 테이블 & 멤버 테이블 조회하며 알림 대상 호출 (
SendNotificationBatchVO
) - Writer: 알림 전송 서비스 호출
스프링 배치에 대한 소개 및 자세한 구현 내용은 생략하고 개발 과정에서의 고민 과정을 풀어나가 보겠다..
일간 투두 성취 누적 포인트 반영 배치고민1: MemberItemReader의 중복..
현재 이 Job은 updateMemberStep과 createTodoAchievementHistoryStep이라는 2개의 step으로 구성되어 있다.
명확하게 책임이 분리되어 있어서 이상적이지만, 한 가지 문제점은 2개의 step이 동일한 Read 작업을 요구하여 동일한 ItemReader
를 사용한다는 점이다.
이는, 동일한 대량의 데이터를 2번이나 읽는 비효율적인 작업이다. 이를 위해 나는 다음과 같은 대안을 생각해보며 여행을 떠났다...
1. ExecutionContext를 이용한 데이터 캐싱
JobExecutionContext를 통해 Step 간의 데이터를 공유할 수 있다는 점을 공부했었다. 이를 활용해서 해결할 수 있을 것이라고 보고, 다음과 같이 코드를 구현해보았다.
DailyTodoAchievementJobConfig
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DailyTodoAchievementJobConfig {
private static final int CHUNK_SIZE = 100;
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job dailyTodoAchievementJob(
Step updateMemberStep,
Step createTodoAchievementHistoryStep
) throws Exception {
return new JobBuilder("dailyTodoAchievementJob", jobRepository)
.validator(new DateParameterValidator())
.incrementer(new RunIdIncrementer())
.start(updateMemberStep)
.next(createTodoAchievementHistoryStep)
.listener(new StopWatchJobListener())
.build();
}
@Bean
@JobScope
public Step updateMemberStep(
ItemReader<Member> memberItemReader,
ItemWriter<Member> memberUpdateWriter
) throws Exception {
return new StepBuilder("updateMemberStep", jobRepository)
.<Member, Member>chunk(CHUNK_SIZE, tx) // 한 번에 처리할 청크 크기
.reader(memberItemReader)
.writer(memberUpdateWriter)
.listener(new MemberStepListener(processedMembers()))
.build();
}
@Bean
@JobScope
public Step createTodoAchievementHistoryStep(
ItemReader<MemberAchievementDTO> cachedMemberReader,
ItemProcessor<MemberAchievementDTO, TodoAchievementHistory> createTodoAchievementHistoryProcessor,
ItemWriter<TodoAchievementHistory> todoAchievementHistoryItemWriter
) throws Exception {
return new StepBuilder("createTodoAchievementHistoryStep", jobRepository)
.<MemberAchievementDTO, TodoAchievementHistory>chunk(CHUNK_SIZE, tx) // 한 번에 처리할 청크 크기
.reader(cachedMemberReader)
.processor(createTodoAchievementHistoryProcessor)
.writer(todoAchievementHistoryItemWriter)
.build();
}
@Bean
@StepScope
public ItemReader<MemberAchievementDTO> cachedMemberReader(
@Value("#{jobExecutionContext['processedMembers']}") List<MemberAchievementDTO> processedMemberAchievements) {
// jobExecutionContext에서 processedMembers라는 key에 해당하는 value를 가져와서 읽는다.
return new ListItemReader<>(processedMemberAchievements);
}
@Bean
@StepScope
public JpaPagingItemReader<Member> memberItemReader() throws Exception {
JpaPagingItemReader<Member> reader = new JpaPagingItemReader<>() {
@Override
public int getPage() {
return 0;
}
};
// memberUpdateWriter에서 100개의 Member의 Status가 초기화되는데, 이로 인해 ItemReader가 다음 조회 시 누락된다. 이에 더불어 페이지는 +1 씩 되므로 자꾸 100개의 데이터가 씹히는 현상이 있었다.
// 이를 해결하기 위해, page를 불러오는 함수를 반드시 첫번째 페이지만 읽도록 고정해두었다. (조회되는 데이터가 ItemWriter로 인해 자꾸 변하기 때문)
reader.setName("memberItemReader");
reader.setPageSize(CHUNK_SIZE);
reader.setEntityManagerFactory(entityManagerFactory);
reader.setQueryString("select m from Member m WHERE m.dailyAchievementCnt > 0");
return reader;
}
@Bean
@StepScope
public ItemWriter<Member> memberUpdateWriter(MemberStateUpdateService memberStateUpdateService, List<MemberAchievementDTO> processedMembers) {
return new MemberUpdateWriter(processedMembers, memberStateUpdateService);
}
@Bean
@StepScope
public CreateTodoAchievementHistoryProcessor createTodoAchievementHistoryProcessor(
@Value("#{jobParameters['date']}") String date) {
return new CreateTodoAchievementHistoryProcessor(date);
}
@Bean
@StepScope
public JpaItemWriter<TodoAchievementHistory> todoAchievementHistoryItemWriter() {
return new JpaItemWriterBuilder<TodoAchievementHistory>()
.usePersist(true)
.entityManagerFactory(entityManagerFactory)
.build();
}
@Bean // ExecutionContext에 저장하여 2번째 step에게 공유할 MemberAchievementDto
public List<MemberAchievementDTO> processedMembers() {
return new ArrayList<>();
}
}
MemberUpdateWriter
@RequiredArgsConstructor
public class MemberUpdateWriter implements ItemWriter<Member> {
private final List<MemberAchievementDTO> processedMembers;
private final MemberStateUpdateService memberStateUpdateService;
@Override
public void write(Chunk<? extends Member> chunk) throws Exception {
List<? extends Member> members = chunk.getItems();
members.forEach(member -> {
processedMembers.add(new MemberAchievementDTO(member.getId(), member.getDailyAchievementCnt())); // Bean으로 등록해둔 processedMembers에 처리된 데이터를 추가해준다.
memberStateUpdateService.addStarAndResetAchieveCnt(member);
});
}
}
MemberStepListener
@Component
public class MemberStepListener {
private final List<MemberAchievementDTO> processedMembers;
public MemberStepListener(List<MemberAchievementDTO> processedMembers) {
this.processedMembers = processedMembers;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
processedMembers.clear();
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
stepExecution.getJobExecution().getExecutionContext().put("processedMembers", processedMembers); // jobExecutionContext에 processedMembers라는 key를 통해 가공된 DTO를 저장해둔다.
return stepExecution.getExitStatus();
}
}
-> 이를 통해, 불필요하게 2번 Member를 전체 조회하는 동작은 해소되었다. 하지만, ExecutionContext
는 메모리에 올라간다는 점이 찝찝하다.
테스트 결과
이렇게 코드를 작성한 후, 10만 건의 Member
데이터에 대해 배치 작업을 돌려보면?
========================================
총 소요 시간 : 27414
========================================
-> Member
와 TodoAchievementHistory
가 Entity라 JpaItemWrtier
를 사용했는데, 쿼리를 찍어보니 item 하나당 하나의 쿼리가 나가는 것을 확인했다..
-> bulk 연산이 당연히 지원되는 줄 알았는데 JpaItemWriter
는 지원이 되지 않나보다.
=> 일단 JpaItemWriter
대신 bulk 연산을 지원하는 JdbcBatchItemWriter
를 사용해보자
2. JdbcPagingItemReader + JdbcBatchItemWriter
DailyTodoAchievementJobConfig
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DailyTodoAchievementJobConfig {
private static final int CHUNK_SIZE = 100;
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final DataSource dataSource;
@Bean
public Job dailyTodoAchievementJob(
Step updateMemberStep,
Step createTodoAchievementHistoryStep
) throws Exception {
return new JobBuilder("dailyTodoAchievementJob", jobRepository)
.validator(new DateParameterValidator())
.incrementer(new RunIdIncrementer())
.start(updateMemberStep)
.next(createTodoAchievementHistoryStep)
.listener(new StopWatchJobListener())
.build();
}
@Bean
@JobScope
public Step updateMemberStep(
ItemReader<MemberAchievementDTO> memberItemReader,
CompositeItemWriter<MemberAchievementDTO> memberUpdateWriter
) throws Exception {
return new StepBuilder("updateMemberStep", jobRepository)
.<MemberAchievementDTO, MemberAchievementDTO>chunk(CHUNK_SIZE, tx) // 한 번에 처리할 청크 크기
.reader(memberItemReader)
.writer(memberUpdateWriter)
.listener(new MemberStepListener(processedMembers()))
.build();
}
@Bean
@JobScope
public Step createTodoAchievementHistoryStep(
ItemReader<MemberAchievementDTO> cachedMemberReader,
ItemProcessor<MemberAchievementDTO, TodoAchievementHistory> createTodoAchievementHistoryProcessor,
ItemWriter<TodoAchievementHistory> todoAchievementHistoryItemWriter
) throws Exception {
return new StepBuilder("createTodoAchievementHistoryStep", jobRepository)
.<MemberAchievementDTO, TodoAchievementHistory>chunk(CHUNK_SIZE, tx) // 한 번에 처리할 청크 크기
.reader(cachedMemberReader)
.processor(createTodoAchievementHistoryProcessor)
.writer(todoAchievementHistoryItemWriter)
.build();
}
@Bean
@StepScope
public ItemReader<MemberAchievementDTO> cachedMemberReader(
@Value("#{jobExecutionContext['processedMembers']}") List<MemberAchievementDTO> processedMemberAchievements) {
return new ListItemReader<>(processedMemberAchievements);
}
@Bean
@StepScope
public JdbcPagingItemReader<MemberAchievementDTO> memberItemReader(
PagingQueryProvider queryProvider
) throws Exception {
// JdbcPagingItemReader로 변경
JdbcPagingItemReader<MemberAchievementDTO> reader = new JdbcPagingItemReader<>() {
@Override
public int getPage() {
return 0;
}
};
reader.setName("memberItemReader");
reader.setPageSize(CHUNK_SIZE);
reader.setDataSource(dataSource);
reader.setRowMapper(new MemberAchievementRowMapper());
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("id, daily_achievement_cnt");
queryProvider.setFromClause("from member");
queryProvider.setWhereClause("where daily_achievement_cnt > 0");
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
@StepScope
public CompositeItemWriter<MemberAchievementDTO> memberUpdateWriter() {
// CachingDTOItemWriter + JdbcBatchItemWriter로 변경
JdbcBatchItemWriter<MemberAchievementDTO> jdbcBatchItemWriter = new JdbcBatchItemWriterBuilder<MemberAchievementDTO>()
.dataSource(dataSource)
.sql("UPDATE member " +
"SET star_point = star_point + scheduled_reward, " +
"daily_achievement_cnt = 0, " +
"scheduled_reward = 0 " +
"WHERE id = ?")
.itemPreparedStatementSetter((item, ps) -> {
ps.setLong(1, item.getMemberId());
})
.build();
CompositeItemWriter<MemberAchievementDTO> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(cachingDtoItemWriter(), jdbcBatchItemWriter));
return compositeItemWriter;
}
@Bean
@StepScope
public ItemWriter<MemberAchievementDTO> cachingDtoItemWriter() {
// 단순히 chunk 단위의 아이템들을 한번에 executionContext에 저장해주는 writer
// 의미상 itemProcessor에 들어가는게 더 맞았을 것 같지만.. chunk 단위로 저장하고 싶었다.
return new CachingDTOItemWriter(processedMembers());
}
@Bean
@StepScope
public CreateTodoAchievementHistoryProcessor createTodoAchievementHistoryProcessor(
@Value("#{jobParameters['date']}") String date) {
return new CreateTodoAchievementHistoryProcessor(date);
}
@Bean
@StepScope
public JdbcBatchItemWriter<TodoAchievementHistory> todoAchievementHistoryItemWriter() {
// JdbcBatchItemWriter로 변경
return new JdbcBatchItemWriterBuilder<TodoAchievementHistory>()
.dataSource(dataSource)
.sql("INSERT INTO todo_achievement_history (member_id, cnt, date, created_at, updated_at) VALUES (:memberId, :cnt, :date, now(), now())")
.beanMapped()
.build();
}
@Bean
public List<MemberAchievementDTO> processedMembers() {
return new ArrayList<>();
}
}
MemberAchievementRowMapper
public class MemberAchievementRowMapper implements RowMapper<MemberAchievementDTO> {
@Override
public MemberAchievementDTO mapRow(ResultSet rs, int i) throws SQLException {
return new MemberAchievementDTO(
rs.getLong("id"),
rs.getLong("daily_achievement_cnt")
);
}
}
테스트 결과
========================================
총 소요 시간 : 8243
========================================
- 27414ms -> 8243ms => 약
70%
개선
3. 하나의 Step으로 통일한 후, CompositeWriter 사용
성능은 어느정도 개선된 것 같지만, 데이터 수를 늘리다보니 큰 문제가 발생했다.
테스트를 위해 사용했던 H2 데이터베이스는 JVM 힙 메모리에서 구동되다보니 데이터가 약 50만건 이상의 데이터를 집어넣으려고 해보았을 때, 메모리 에러가 발생했다.
Caused by: java.lang.OutOfMemoryError: Java heap space
이를 해결하기 위해서 메모리를 늘려볼까 했으나, 실제 구동될 mysql로 테스트를 진행하는 것도 나쁘지 않을 것 같다고 생각하여 mysql로 변경 후 부푼 마음으로 다시 테스트를 진행해보았다.
하지만, 다시 한번 큰 문제가 발생한다..
우리는 캐싱을 위해 ExecutionContext
를 사용했었다. 하지만, 이 ExecutionContext
는BATCH_JOB_EXECUTION_CONTEXT
테이블의 SERIALIZED_CONTEXT
라는 컬럼에 저장된다. 그리고 mysql에서는 그 자료형이 TEXT
로 약 64KB(약 65,535자)를 허용할 수 있는 크기이다. 우리가 저장할 데이터는 수십 수백만건의 데이터이므로 이는 턱없이 부족한 용량이다.
H2 데이터베이스에서의 해당 컬럼의 자료형은 LONGVARCHAR로 약 2GB까지 허용 가능한 자료형이어서 이전까지는 이런 문제가 발생하지 않았었다.
그러다보니 다음과 같은 에러가 발생한다.
Caused by:com.mysql.cj.jdbc.exceptions.MysqlDataTruncation:
Data truncation:
Data too
long for column 'SERIALIZED_CONTEXT'
at row 1
데이터가 너무 많아서 저장도 되지 않는다. 이를 해결하기 위한 간단한 방법은 테이블 초기화 SQL을 수동으로 작성해주고, 이때 자료형을 LONGTEXT
같은 자료형으로 변경해주는 것이지만 이는 그 순간에서의 해결책밖에 되지 못할뿐 근본적인 해결책이 되지 못한다. 즉, 메모리에 모든 데이터를 캐싱해두는 전략은 장기적으로 보았을 때 좋은 방식은 아니다.
또한, 메모리에 모든 데이터를 저장한다는 점에서 자료형이 넉넉하더라도 메모리가 충분하지 않으면 역시 OutOfMemoryError
가 발생할 수도 있을 것이다.
이를 해결하기 위해 Redis와 같은 글로벌 캐시를 도입할 수도 있겠으나, 이 방식 역시 데이터가 많아지면 Redis가 터질 수도 있다는 단점이 존재한다. 그렇다면 다른 근본적인 해결책으로 무엇이 있을까? 고민해보자..
우리의 방식의 근본적인 문제는 똑같은 데이터를 2번 조회한다는 점이다. -> 그럼 하나의 Step으로 합치면 되지 않나?
지금까지 그러지 못했던 이유는 Jpa를 사용하다보니 Entity라는 개념에 묶여있었다는 점과 하나의 Step에서 입력받는 데이터가 서로 다른 2개의 itemWriter를 사용할 수 없기 때문이었다.
하지만, 엔터티라는 틀에서 벗어나 사실상 정말로 필요한 데이터만 뽑아보면, member_id
와 daily_achievement_cnt
뿐이다. 이 2개의 값만 있으면 2개의 itemWriter를 모두 운영할 수 있다. (사실 JpaItemReader
와 JpaItemWriter
를 사용하지 않는 순간부터 엔터티의 개념이 필요없어져서 모두 MemberAchievementDTO
를 사용하긴 했다.)
이를 바탕으로 우리의 Job을 다음과 같이 재구성할 수 있다.
1개의 Step
MemberAchievementDTO
를 읽는 1개의 ItemReaderCompositeItemWriter<MemberAchievementDTO>
- 멤버의 status를 변경해주는
ItemWriter<MemberAchievementDTO>
- TodoAchievementHistory를 생성하는
ItemWriter<MemberAchievementDTO>
- 멤버의 status를 변경해주는
CompositeItemWriter를 사용하면 여러 개의 itemWriter를 하나의 step에서 사용 가능하다. 대신 문제는 Processor에서 가공되어 넘겨받는 데이터 타입을 통일시켜야한다는 점이다.
이전까지는 하나의 Writer는 ItemWriter<Member> 타입이고, 다른 하나는 ItemWriter<MemberAchievementDTO> 타입이기에 불가능했다..
결과
DailyTodoAchievementJobConfig
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DailyTodoAchievementJobConfig {
private static final int CHUNK_SIZE = 100;
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final DataSource dataSource;
@Bean
public Job dailyTodoAchievementJob(
Step updateMemberStep,
Step createTodoAchievementHistoryStep
) throws Exception {
return new JobBuilder("dailyTodoAchievementJob", jobRepository)
.validator(new DateParameterValidator())
.incrementer(new RunIdIncrementer())
.start(updateMemberStep)
.next(createTodoAchievementHistoryStep)
.listener(new StopWatchJobListener())
.build();
}
@Bean
@JobScope
public Step dailyTodoAchievementStep(
ItemReader<MemberAchievementDTO> memberItemReader,
CompositeItemWriter<MemberAchievementDTO> compositeItemWriter
) {
return new StepBuilder("dailyTodoAchievementStep", jobRepository)
.<MemberAchievementDTO, MemberAchievementDTO>chunk(CHUNK_SIZE, tx) // 한 번에 처리할 청크 크기
.reader(memberItemReader)
.writer(compositeItemWriter)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<MemberAchievementDTO> memberItemReader(
@Value("#{jobParameters['date']}") String date,
PagingQueryProvider queryProvider
) throws Exception {
JdbcPagingItemReader<MemberAchievementDTO> reader = new JdbcPagingItemReader<>() {
@Override
public int getPage() {
return 0;
}
};
reader.setName("memberItemReader");
reader.setPageSize(CHUNK_SIZE);
reader.setDataSource(dataSource);
reader.setRowMapper(new MemberAchievementRowMapper(date));
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("id, daily_achievement_cnt");
queryProvider.setFromClause("from member");
queryProvider.setWhereClause("where daily_achievement_cnt > 0");
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
@StepScope
public CompositeItemWriter<MemberAchievementDTO> compositeItemWriter(
JdbcBatchItemWriter<MemberAchievementDTO> updateMemberStateItemWriter,
JdbcBatchItemWriter<MemberAchievementDTO> todoAchievementHistoryItemWriter
) {
CompositeItemWriter<MemberAchievementDTO> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(updateMemberStateItemWriter, todoAchievementHistoryItemWriter));
return compositeItemWriter;
}
@Bean
@StepScope
public JdbcBatchItemWriter<MemberAchievementDTO> updateMemberStateItemWriter() {
return new JdbcBatchItemWriterBuilder<MemberAchievementDTO>()
.dataSource(dataSource)
.sql("UPDATE member " +
"SET star_point = star_point + scheduled_reward, " +
"daily_achievement_cnt = 0, " +
"scheduled_reward = 0 " +
"WHERE id = ?")
.itemPreparedStatementSetter((item, ps) -> {
ps.setLong(1, item.getMemberId());
})
.build();
}
@Bean
@StepScope
public JdbcBatchItemWriter<MemberAchievementDTO> todoAchievementHistoryItemWriter() {
return new JdbcBatchItemWriterBuilder<MemberAchievementDTO>()
.dataSource(dataSource)
.sql("INSERT INTO todo_achievement_history (member_id, cnt, date, created_at, updated_at) VALUES (:memberId, :achievementCount, :date, now(), now())")
.beanMapped()
.build();
}
}
- 이전 코드와 거의 동일하다. 캐싱처리를 하는 부분 및
StepListener
가 빠지고, 2개의 itemWriter를CompositeItemWriter
로 묶어줬을 뿐이다. - 하나 다른 점은 itemProcessor가 없기에, itemReader에서
jobParameters
를 받아온다.
테스트 결과
- H2 10만건 배치
========================================
총 소요 시간 : 6030
========================================
- 27414ms -> 6030ms => 약
78%
개선
상당히 개선되었다.
MySQL에서의 소요 시간은 다음과 같다.
- MySQL 10만건 배치
========================================
총 소요 시간 : 24761
========================================
확실히 메모리 방식이 아니다 보니 속도가 현저히 느려진 것을 볼 수 있다.
100만 건도 한번 진행해보았는데, 배치 작업이 개선이 되었음에도 너무 느렸다. 다른 방식이 필요하다..
- 100만건 배치
========================================
총 소요 시간 : 1359977
========================================
여기서 생각해볼 수 있는 것이 멀티 스레드이다. 단일 스레드로 100만건의 데이터를 읽고, 200만건의 데이터를 쓰는 것은 아무래도 속도가 느릴 수 밖에 없다.
멀티 스레드 기법을 사용해서 속도를 더욱 향상시켜보자!
고민2. 멀티 스레드 기법 도입!
Spring Batch에는 대표적으로 4개의 멀티 스레드 모델이 존재한다.
AsyncItemProcessor / AsyncItemWriter
Multi-threaded Step
Parallel Steps
Partitioning
이 중에서 어떤 모델을 사용해야 할지를 고민하는 것이 먼저일 것이다.
모델 선택 과정
AsyncItemProcessor / AsyncItemWriter
- Step 안에서
ItemProcessor
와ItemWriter
가 비동기적으로 실행되는 구조로, 별개의 쓰레드를 통해ItemProcessor
와ItemWriter
를 처리한다. spring-batch-integration
의존성이 필요하다- Async의 경우 처리 로직이 무거워서 CPU 부하가 큰 작업들에 적용하기 적합하다. 예를 들어, 복잡한 데이터 변환이나 외부 서비스 호출 등이 있을 것이다.
- 하지만, 우리의 경우 로직이 무겁다기 보단 단순히 대량의 데이터를 읽고 쓰는 I/O 작업이므로, 디스크 I/O 속도 자체를 늘려야 한다. 때문에 적합하지는 않아 보인다.
- 또한, 애초에 ItemReader의 경우는 멀티 스레드를 사용할 수 없다는 점이 문제다
- Step 안에서
Multi-threaded Step
- 단일 Step을 수행할 때, 해당 Step 내의 각 Chunk를 별도의 여러 쓰레드에서 실행하는 구조로, Thread-Safe한 Reader가 필요하며 각 스레드마다
ItemReader
,ItemProcessor
,ItemWriter
를 공유한다.
- 단일 Step을 수행할 때, 해당 Step 내의 각 Chunk를 별도의 여러 쓰레드에서 실행하는 구조로, Thread-Safe한 Reader가 필요하며 각 스레드마다
Parallel Steps
- SplitState를 사용해서 여러 개의 Flow들을 병렬적으로 실행하는 구조로, 우리의 Job은 FlowJob이 아니므로 구조를 변경하지 않는 이상 사용할 수는 없을 것 같다.
Partitioning
MasterStep
이SlaveStep
을 실행시키고, SlaveStep이 각 스레드에 의해 독립적으로 실행되는 구조로, 우리의 기존 step인 dailyTodoAchievementStep를
slaveStep으로 두고, PartitionStep인 MasterStep을 추가하면 구현가능할 듯하다.- 데이터를 더 작은 Chunk (파티션이라고 함)로 나눈 다음 파티션에서 슬레이브가 독립적으로 작동하는 방식이다.
Multi-threaded Step vs Partitioning
멀티쓰레드 Step은 단일 Step을 Chunk 단위로 쓰레드를 생성해 분할 처리-> 어떤 쓰레드에서 어떤 데이터들을 처리하게 할지 세밀한 조정이 불가능
-> 해당 Step의 ItemReader/ItemWriter 등이 멀티쓰레드 환경을 지원하는지 유무가 굉장히 중요
파티셔닝 (Partitioning)의 독립적인 Step (Worker Step)을 구성-> 각각 별도의 StepExecution 파라미터 환경을 가지게 하여 처리
-> (Local로 실행할 경우) 멀티쓰레드로 작동하나, 멀티스레드 Step과 별개로 ItemReader/ItemWriter의 멀티스레드 환경 지원 여부가 중요하지 않다.
우리의 경우, 이미 ItemReader / ItemWriter가 모두 Thread-safe 하며, 데이터들에 대한 세밀한 조정이 굳이 필요하지 않은 상황이다.
때문에 MasterStep, SlaveStep을 나누고, Partitioning을 구현하는 작업은 불필요하다고 생각되어 Multi-treaded Step
모델을 선택했다.
완성 코드
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DailyTodoAchievementJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final DataSource dataSource;
private int chunkSize;
private int poolSize;
@Value("${chunkSize:1000}")
public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
@Value("${poolSize:10}")
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
@Bean
public TaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(poolSize);
executor.setMaxPoolSize(poolSize * 2);
executor.setThreadNamePrefix("multi-thread-");
executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
executor.initialize();
return executor;
}
@Bean
public Job dailyTodoAchievementJob(
Step updateMemberStep,
Step createTodoAchievementHistoryStep
) throws Exception {
return new JobBuilder("dailyTodoAchievementJob", jobRepository)
.validator(new DateParameterValidator())
.incrementer(new RunIdIncrementer())
.start(updateMemberStep)
.next(createTodoAchievementHistoryStep)
.listener(new StopWatchJobListener())
.build();
}
@Bean
@JobScope
public Step dailyTodoAchievementStep(
ItemReader<MemberAchievementDTO> memberItemReader,
CompositeItemWriter<MemberAchievementDTO> compositeItemWriter
) {
return new StepBuilder("dailyTodoAchievementStep", jobRepository)
.<MemberAchievementDTO, MemberAchievementDTO>chunk(chunkSize, tx) // 한 번에 처리할 청크 크기
.reader(memberItemReader)
.writer(compositeItemWriter)
.taskExecutor(executor())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<MemberAchievementDTO> memberItemReader(
@Value("#{jobParameters['date']}") String date,
PagingQueryProvider queryProvider
) throws Exception {
JdbcPagingItemReader<MemberAchievementDTO> reader = new JdbcPagingItemReader<>();
reader.setName("memberItemReader");
reader.setPageSize(chunkSize);
reader.setDataSource(dataSource);
reader.setRowMapper(new MemberAchievementRowMapper(date));
reader.setQueryProvider(queryProvider);
reader.setSaveState(false);
return reader;
}
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("id, daily_achievement_cnt");
queryProvider.setFromClause("from member");
queryProvider.setWhereClause("where daily_achievement_cnt > 0");
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
@StepScope
public CompositeItemWriter<MemberAchievementDTO> compositeItemWriter(
JdbcBatchItemWriter<MemberAchievementDTO> updateMemberStateItemWriter,
JdbcBatchItemWriter<MemberAchievementDTO> todoAchievementHistoryItemWriter
) {
CompositeItemWriter<MemberAchievementDTO> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(todoAchievementHistoryItemWriter, updateMemberStateItemWriter));
return compositeItemWriter;
}
@Bean
@StepScope
public JdbcBatchItemWriter<MemberAchievementDTO> updateMemberStateItemWriter() {
return new JdbcBatchItemWriterBuilder<MemberAchievementDTO>()
.dataSource(dataSource)
.sql("UPDATE member " +
"SET star_point = star_point + scheduled_reward, " +
"daily_achievement_cnt = 0, " +
"scheduled_reward = 0 " +
"WHERE id = ?")
.itemPreparedStatementSetter((item, ps) -> {
ps.setLong(1, item.getMemberId());
})
.build();
}
@Bean
@StepScope
public JdbcBatchItemWriter<MemberAchievementDTO> todoAchievementHistoryItemWriter() {
return new JdbcBatchItemWriterBuilder<MemberAchievementDTO>()
.dataSource(dataSource)
.sql("INSERT INTO todo_achievement_history (member_id, cnt, date, created_at, updated_at) VALUES (:memberId, :achievementCount, :date, now(), now())")
.beanMapped()
.build();
}
}
@Value("${poolSize:10}")
생성할 쓰레드 풀의 쓰레드 수를 환경변수로 받아서 사용한다.
${poolSize:10}
에서 10은 앞에 선언된 변수 poolSize가 없을 경우 10을 사용한다는 기본값- 배치 실행시 PoolSize를 조정하는 이유는 실행 환경에 맞게 유동적으로 쓰레드풀을 관리하기 위함이다.
- 개발 환경에서는 1개의 쓰레드로, 운영에선 10개의 쓰레드로 실행할 수도 있고, 혹은 같은 시간대에 수행되는 다른 배치들로 인해서 갑자기 쓰레드 개수를 줄여야 할 수도 있다.
- 언제든 유동적으로 배치 실행시점에 몇개의 쓰레드를 생성할지 결정할 수 있으니 웬만하면 외부에 받아서 사용하는 방식을 선호된다.
- Field가 아닌 Setter로 받는 이유는 Spring Context가 없이 테스트 코드를 작성할때 PoolSize, ChunkSize등을 입력할 방법이 없기 때문이다.
ThreadPoolTaskExecutor
여기서는 쓰레드 풀을 이용한 쓰레드 관리 방식인
ThreadPoolTaskExecutor
을 사용했다.- 이외에도
SimpleAsyncTaskExecutor
가 있는데, 이를 사용할 경우 매 요청시마다 쓰레드를 생성하게 됩니다. - 이때 계속 생성하다가 concurrency limit을 초과할 경우 이후 요청을 막게되는 현상까지 있어, 운영 환경에선 잘 사용하진 않는다.
.saveState(false)
- 멀티쓰레드 환경에서 사용할 경우 필수적으로 사용해야할 옵션이
saveState = false
해당 옵션을 끄게 되면 (false) Reader가 실패한 지점을 저장하지 못하게해, 다음 실행시에도 무조건 처음부터 다시 읽도록 한다.
- 이 옵션을 켜놓으면 오히려 더 큰 문제가 발생할 수 있다.
- 8번째 Chunk 에서 실패했는데, 사실은 4번째 Chunk도 실패했다면 8번째가 기록되어 다음 재실행시 8번째부터 실행될수 있기 때문이다.
- 실패하면 무조건 처음부터 다시 실행될 수 있도록 해당 옵션은 false로 두자.
- 비슷한 기능으로 Job 옵션에 있는
.preventRestart()
가 있는데, 해당 옵션은 Job이 같은 파라미터로 재실행되는것을 금지한다. .saveState(false)
는 Reader가 실패난 지점을 기록하지 못하게 하는 옵션이라 엄밀히 말하면 둘은 서로 다른 옵션이긴 하다. Step 재실행을 막는다정도로 보자.
- 멀티쓰레드 환경에서 사용할 경우 필수적으로 사용해야할 옵션이
테스트 결과
- H2 10만건 배치(chunkSize=500, poolSize=8)
========================================
총 소요 시간 : 2603
========================================
- 27414ms -> 2603ms => 약
90.5%
개선 - MySQL 10만건 배치
========================================
총 소요 시간 : 4924
========================================
- 24761ms -> 4924ms => 약
80.1%
개선
상당히 개선되었다. - MySQL 100만건 배치
========================================
총 소요 시간 : 1359977
========================================
- 1359977ms -> 62348ms => 약
99.5%
개선
상당히 개선되었다.
이정도면.. 만족한다! 다음 배치 Job으로 넘어가보자
일정 알림 배치
- 정의: 유저가 생성한 일정 30분 전에 알림을 보낸다.
- 배치 주기: 1분 마다?
- (Async)Reader: 투두 테이블 & 멤버 테이블 조회하며 알림 대상 호출 (
SendNotificationBatchVO
) - (Async)Writer: 알림 전송 서비스 호출
Wrtier의 경우 외부인프라를 사용하는 부분이며, 알림을 보내는 것을 기다릴 필요가 없기에 AsyncItemWriter
를 사용하는 것이 좋아보인다.
고민1: AsyncItemWriter만 단독으로 사용할 수 없음
...
@Bean
@StepScope
public ItemProcessor<SendNotificationBatchDTO, SendNotificationBatchDTO> sendNotificationProcessor() {
return item -> {
log.info("Send email to = {}", item.getEmail());
return item;
};
}
@Bean
public AsyncItemProcessor<SendNotificationBatchDTO, SendNotificationBatchDTO> asyncSendNotificationProcessor() throws InterruptedException {
AsyncItemProcessor<SendNotificationBatchDTO, SendNotificationBatchDTO> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(sendNotificationProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
@StepScope
public ItemWriter<SendNotificationBatchDTO> sendNotificationWriter(MailService mailService) {
return new SendNotificationWriter(mailService);
}
@Bean
public AsyncItemWriter<SendNotificationBatchDTO> asyncSendNotificationWriter(MailService mailService) {
AsyncItemWriter<SendNotificationBatchDTO> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(sendNotificationWriter(mailService));
return asyncItemWriter;
}
...
- 일단 중간에 email을 보낸다는 로그를 찍어주는
ItemProcessor
를 추가해두었다. 아무 의미 잆는ItemProcessor
이지만, 필요하니.. - 다른 의미있는 작업을 할 수 있는게 있는지 찾아볼 필요가 있겠다.
확실히 비동기 작업으로 처리하니 처리가 매우 빠르다.
AsyncItemProcessor
/AsyncItemWriter
를 적용하기 전에는 메일을 보내는 데 한 세월이 걸렸던 반면,
비동기로 처리하면 기다릴 필요가 없다보니 1초만에 모든 작업이 처리되었다.
고민2: CompositeItemReader
우리 서비스의 경우, Todo와 TodoInstance라는 2가지의 개념이 존재하며, 이는 반복여부에 차이가 있다. 어쨌든, 우리는 알림을 보내기 위해서는 todo와 todoInstance를 모두 조회하여 동일한 포맷으로 알림을 보내줄 필요가 있다.
이를 위해 ItemReader가 2개가 필요하게 되는데, 이들은 다른 데이터를 읽고 같은 데이터를 반환한다는 특징이 있다.
나는 여기서 CompositeItemReader라는 것이 있다면 사용할 수 있겠다고 생각하고 공식문서를 뒤져보았지만, 2024-09-11 현 시점까지는 아직 실험적인 기능으로만 지원된다고 한다..
https://github.com/spring-projects-experimental/spring-batch-experimental#composite-item-reader
실험적인 기능을 프로젝트에 도입할지.. 아니면 일단 뒤로하고 귀찮지만 2개의 Step으로 분리할지가 고민이었다.가능하면 최대한 간단하게 가고싶었고, 데이터만 다를 뿐 완전히 동일한 기능을 하는 것이고, 솔직히 새로운 기능을 써보고 싶다는 욕심 때문에 나는 CompositeItemReader
를 사용해보기로 마음먹었다.
그 구현은 다음과 같다.
...
@Bean
@StepScope
public CompositeItemReader<SendNotificationBatchDTO> sendNotificationReader(
JdbcPagingItemReader<SendNotificationBatchDTO> todoReader,
JdbcPagingItemReader<SendNotificationBatchDTO> todoInstanceReader
) {
return new CompositeItemReader<>(Arrays.asList(todoReader, todoInstanceReader));
}
@Bean
@StepScope
public JdbcPagingItemReader<SendNotificationBatchDTO> todoReader(
PagingQueryProvider todoQueryProvider
) throws Exception {
return new JdbcPagingItemReaderBuilder<SendNotificationBatchDTO>()
.name("todoReader")
.dataSource(this.dataSource)
.pageSize(chunkSize)
.beanRowMapper(SendNotificationBatchDTO.class)
.queryProvider(todoQueryProvider)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<SendNotificationBatchDTO> todoInstanceReader(
PagingQueryProvider todoInstanceQueryProvider
) throws Exception {
return new JdbcPagingItemReaderBuilder<SendNotificationBatchDTO>()
.name("todoInstanceReader")
.dataSource(this.dataSource)
.pageSize(chunkSize)
.beanRowMapper(SendNotificationBatchDTO.class)
.queryProvider(todoInstanceQueryProvider)
.build();
}
@Bean
public PagingQueryProvider todoQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("t.id, t.start_at, t.content, m.username, m.email");
queryProvider.setFromClause("from todo t" +
" left join member m on t.writer_id = m.id");
queryProvider.setWhereClause("where t.start_at >= now() + INTERVAL 30 minute" +
" and t.start_at < now() + INTERVAL 33 minute" +
" and t.is_all_day = 0");
// 배치 수행 시간의 오차를 고려해서 3분의 오차 설정
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("t.start_at", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
public PagingQueryProvider todoInstanceQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("ti.id, ti.start_at, ti.content, m.username, m.email");
queryProvider.setFromClause("from todo_instance ti" +
" inner join todo t on ti.todo_id = t.id" +
" left join member m on t.writer_id = m.id");
queryProvider.setWhereClause("where ti.start_at >= now() + INTERVAL 30 minute" +
" and ti.start_at < now() + INTERVAL 33 minute" +
" and ti.is_all_day = 0");
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("ti.start_at", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
...
구현은 매우 심플하다!
일단 여기서 마무리 하겠지만 더 고민해보면 개선사항은 많다.
...
지금은 TodoInstance가 없어지고 단순히 Todo만 존재하기에 CompositeItemReader
는 사용하지 않아도된다!
@Slf4j
@Configuration
@RequiredArgsConstructor
public class SendNotificationJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager tx;
private final DataSource dataSource;
private int chunkSize;
@Value("${chunkSize:1000}")
public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
@Bean
public Job sendNotificationJob(
Step sendNotificationStep
) throws Exception {
return new JobBuilder("sendNotificationJob", jobRepository)
.start(sendNotificationStep)
.listener(new StopWatchJobListener())
.build();
}
@Bean
@JobScope
public Step sendNotificationStep(
ItemReader<SendNotificationBatchDTO> todoReader,
ItemProcessor<SendNotificationBatchDTO, SendNotificationBatchDTO> asyncSendNotificationProcessor,
ItemWriter<SendNotificationBatchDTO> asyncSendNotificationWriter
) throws Exception {
return new StepBuilder("sendNotificationStep", jobRepository)
.<SendNotificationBatchDTO, SendNotificationBatchDTO>chunk(chunkSize, tx)
.reader(todoReader)
.processor(asyncSendNotificationProcessor)
.writer(asyncSendNotificationWriter)
.allowStartIfComplete(true)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<SendNotificationBatchDTO> todoReader(
PagingQueryProvider todoQueryProvider
) throws Exception {
return new JdbcPagingItemReaderBuilder<SendNotificationBatchDTO>()
.name("todoReader")
.dataSource(this.dataSource)
.pageSize(chunkSize)
.beanRowMapper(SendNotificationBatchDTO.class)
.queryProvider(todoQueryProvider)
.build();
}
@Bean
public PagingQueryProvider todoQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("t.id, t.start_at, t.content, m.username, m.email");
queryProvider.setFromClause("from todo t" +
" left join member m on t.writer_id = m.id");
queryProvider.setWhereClause("where t.start_at >= now() + INTERVAL 30 minute" +
" and t.start_at < now() + INTERVAL 33 minute" +
" and t.is_all_day = 0");
// 배치 수행 시간의 오차를 고려해서 3분의 오차 설정
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("t.start_at", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
@StepScope
public ItemProcessor<SendNotificationBatchDTO, SendNotificationBatchDTO> sendNotificationProcessor() {
return item -> {
log.info("Send email to = {}", item.getEmail());
return item;
};
}
@Bean
public AsyncItemProcessor<SendNotificationBatchDTO, SendNotificationBatchDTO> asyncSendNotificationProcessor() throws InterruptedException {
AsyncItemProcessor<SendNotificationBatchDTO, SendNotificationBatchDTO> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(sendNotificationProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
@StepScope
public ItemWriter<SendNotificationBatchDTO> sendNotificationWriter(MailService mailService) {
return new SendNotificationWriter(mailService);
}
@Bean
public AsyncItemWriter<SendNotificationBatchDTO> asyncSendNotificationWriter(MailService mailService) {
AsyncItemWriter<SendNotificationBatchDTO> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(sendNotificationWriter(mailService));
return asyncItemWriter;
}
}
'Project' 카테고리의 다른 글
[TODOMON] EP.8 nGrinder + Prometheus + Grafana 설정 w/ Docker & Spring Actuator (1) | 2025.02.03 |
---|---|
[TODOMON] EP.7 부하 테스트 계획 (0) | 2025.02.03 |
[TODOMON] EP.5 견고한(?)결제 시스템 구축기.. with 포트원 (2) | 2025.01.31 |
[TODOMON] EP.4 아이템 기능 구현 (0) | 2025.01.30 |
[TODOMON] EP.3 펫 정보 저장 방식에 대한 고민 (0) | 2025.01.28 |