성능 테스트를 진행하다보니 여러 스레드의 동시 요청을 처음으로 경험할 수 있었다. 그리고 역시나 무수히 무수히 많은 문제가 발생했다...
이번 포스팅에서는 여러 로직에 대한 동시성 테스트를 진행해보면서 이를 해결하는 과정을 적어보려고 한다.
먼저 동시성 테스트에 사용되는 클래스 및 인터페이스에 대해 알아보자.
멀티 스레드를 사용하는 비동기 테스트에 사용되는 클래스
Executors
: 스레드 풀을 손쉽게 생성해주는 팩토리 클래스
- 스레드 풀 종류
FixedThreadPool
- 고정된 스레드 개수를 가지는 스레드 풀
- 작업이 고정된 스레드 개수를 넘는다면 큐에서 대기
CachedThredPool
- 필요할 때 필요한 만큼의 스레드를 생성하는 스레드 풀
- 이미 생성된 스레드가 있다면 이를 재활용하여 사용
ScheculedThreadPool
- 일정 시간 또는 주기적으로 실행되어야 하는 작업을 위한 스레드를 생성하는 스레드 풀
Executors
에서 해당 스레드 풀을 생성하는 팩토리 메소드를 제공해서, 작업 등록 및 실행을 위한 인터페이스인 ExecutorSerivce
인터페이스를 반환하여 이후에 스레드 풀에 작업을 실행할 수 있게 해준다.
ex) ExecutorService executorService = Executors.newFixedThreadPool(100);
ExecutorService
: 작업 등록 및 실행을 위한 인터페이스
ExecutorService
가 제공하는 기능들은 라이프사이클 관리 기능, 비동기 처리 기능으로 나눌 수 있는데 여기서는 관련있는 비동기 처리 기능만 살펴본다.
sumbit
- 단일 Task 처리
- 실행할 작업들을 추가하고, 실행한 작업의 상태와 결과를 나타내는
Future
객체를 반환 - 반환받은
Future
의get
메소드를 호출하여 실행한 작업 완료 시 결과를 확인할 수 있음
invokeAll
- 여러 Task 처리
- 주어진 작업들을 실행하고 모든 결과가 나올 때까지 대기하는 블로킹 방식으로 작업 처리
- 동시에 주어진 작업을 모두 처리하고 모두 끝나면 각각의 상태와 결과를 가지는
List
반환
invokeAny
- 여러 Task 처리
- 주어진 작업들을 실행하고 가장 먼저 실행된 결과가 나올 때까지 대기하는 블로킹 방식으로 작업 처리
- 동시에 주어진 작업을 모두 처리하고 가장 빨리 끝난 작업 1개의 결과를
Future
객체로 반환
CountDownLatch
: 하나 이상의 스레드가 다른 스레드에서 수행 중인 작업이 완료될 때까지 기다릴 수 있도록 하는 동기화 보조 장치
CountDownLatch
객체 생성 시, 파라미터로 전달한count
를 기반으로 대기countDown()
: 설정된 count를 1씩 감소하는 역할await()
: count가 0이 될 때까지 대기하는 역할
⇒ 스레드 작업 수행 시마다 countDown()
메소드를 호출하여 스레드의 모든 작업이 수행되어 count
가 0이 되면 이후 작업을 수행하는 식으로 사용
TodoService.updateStatusAndReward 동시성 테스트
포스팅 내용은 과거에 진행했던 성능 테스트를 기준으로 작성된 것이며, 현재는 TodoInstance에 관한 내용은 모두 제거된 상태.
@Test
@DisplayName("100명의 유저가 각 3개의 todo에 대한 완료 요청")
void mulithreadUpdateStatus() throws InterruptedException {
// given
int threadCount = 100;
int TODO_CNT_PER_MEMBER = 3;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount * TODO_CNT_PER_MEMBER); // threadCount 당 TODO_CNT_PER_MEMBER 번 테스트
List<Member> testers = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
Member tester = createMember(i);
testers.add(tester);
}
memberRepository.saveAllAndFlush(testers);
List<Todo> todos = new ArrayList<>();
for (int i = 0; i < TODO_CNT_PER_MEMBER; i++) {
for (int j = 0; j < threadCount; j++) {
Todo todo = testTodoFactory.createSingleTodo(
LocalDateTime.of(2024, 7, 7, 7, 0),
LocalDateTime.of(2024, 7, 7, 8, 0),
false,
testers.get(j)
);
todos.add(todo);
}
}
todoRepository.saveAllAndFlush(todos);
UpdateTodoStatusReq req = UpdateTodoStatusReq.builder()
.isDone(true)
.build();
// when
for (int k = 0; k < TODO_CNT_PER_MEMBER; k++) {
for (int i = 0; i < threadCount; i++) {
int finalK = k; // 로컬 변수로 복사
int finalI = i;
executorService.submit(() -> {
try {
int index = threadCount * finalK + finalI; // 각 스레드에서 독립적으로 사용할 수 있도록 수정
Member tester = testers.get(finalI);
saveMemberToContext(tester);
todoService.updateStatusAndReward(todos.get(index).getId(), false, tester.getId(), req);
} finally {
latch.countDown();
}
});
}
}
latch.await();
// then
assertThat(todoRepository.findAll())
.hasSize(threadCount * TODO_CNT_PER_MEMBER)
.extracting("isDone")
.containsOnly(true);
assertThat(memberRepository.findAll())
.hasSize(threadCount)
.allMatch(member -> member.getDailyAchievementCnt() == TODO_CNT_PER_MEMBER)
.allMatch(member -> member.getScheduledReward() == TODO_CNT_PER_MEMBER * 10);
}
private Member createMember(long i) {
Member tester = Member.builder()
.username("tester" + i)
.email("tester" + i + "@test.com")
.provider(OAuth2Provider.GOOGLE)
.providerId("google_foobarfoobar" + i)
.role(Role.ROLE_USER)
.profileImageUrl("img")
.build();
tester.initDiligence();
return tester;
}
private static void saveMemberToContext(Member member) {
SecurityContext context = SecurityContextHolder.createEmptyContext();
MemberDTO dto = MemberDTO.from(member);
TodomonOAuth2User todomonOAuth2User = TodomonOAuth2User.from(dto); // 사용자의 커스텀 정보 설정
OAuth2AuthenticationToken auth = new OAuth2AuthenticationToken(
todomonOAuth2User,
todomonOAuth2User.getAuthorities(),
todomonOAuth2User.getProvider()
);
context.setAuthentication(auth);
SecurityContextHolder.setContext(context);
}
- 100명의 유저가 있고, 각 유저는 3개의 todo를 갖고있으며, 모든 유저는 자신의 투두 3개를 완료처리하는 요청을 동시에 보내게된다.
TODO_CNT_PER_MEMBER
가 1일 때는 문제가 발견되지 않았지만, 1보다 큰 수일 때는 일부 todo가 완료되지 않는 문제가 발생했다.CountDownLatch
초기화 시 테스트 횟수를 잘 넘겨주자.. 실제보다 작은 값을 넘겨줬다가 테스트가 계속 실패해서 삽질을 조금 오래했다.. ㅋㅋ
왜 이러한 결과가 발생했을까?
TodoService.updateStatusAndReward
는 다음과 같은 로직을 갖고있다.
TodoService.updateStatusAndReward
@Transactional
@IsMyTodoOrAdmin
public void updateStatusAndReward(Long objectId, boolean isInstance, Long memberId, UpdateTodoStatusReq req) {
Member findMember = memberRepository.findMemberWithDiligence(memberId)
.orElseThrow(() -> new NotFoundException(ErrorCode.NOT_FOUND_MEMBER));
if (isInstance) {
TodoInstance todoInstance = todoInstanceRepository.findTodoInstanceWithTodo(objectId)
.orElseThrow(() -> new NotFoundException(ErrorCode.NOT_FOUND_TODO));
if (req.getIsDone()) {
if (!todoInstance.isDone()) {
todoInstance.updateIsDone(true);
rewardForInstance(todoInstance, findMember);
}
} else {
if (todoInstance.isDone()) {
withdrawRewardForInstance(todoInstance, findMember);
todoInstance.updateIsDone(false);
}
}
} else {
Todo findTodo = todoRepository.findById(objectId)
.orElseThrow(() -> new NotFoundException(ErrorCode.NOT_FOUND_TODO));
// 반복 정보가 없는 단일 todo의 경우 -> 단순 상태 업데이트 및 보상 지급
if (!findTodo.isDone() && req.getIsDone()) {
findTodo.updateIsDone(true);
reward(findMember, 1);
} else if (findTodo.isDone() && !req.getIsDone()) {
findTodo.updateIsDone(false);
withdrawReward(findMember, 1);
}
}
}
private void reward(Member member, int leverage) {
// 일간 달성 수 1 증가
member.addDailyAchievementCnt(1);
// 유저 일관성 게이지 업데이트
member.getDiligence().increaseGauge(GAUGE_INCREASE_RATE * leverage);
// 보상 지급
member.addScheduledReward((long) (REWARD_UNIT * leverage * REWARD_LEVERAGE_RATE));
}
MyTodoOrAdminAspect
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class MyTodoOrAdminAspect extends AuthAspect {
private final TodoRepository todoRepository;
private final TodoInstanceRepository todoInstanceRepository;
@Pointcut("@annotation(com.maruhxn.todomon.core.global.auth.checker.IsMyTodoOrAdmin)")
public void isMyTodoOrAdminPointcut() {
}
@Around("isMyTodoOrAdminPointcut() && args(objectId, arg,..)")
public void checkIsMyTodoOrAdmin(ProceedingJoinPoint joinPoint, Long objectId, Object arg) throws Throwable {
TodomonOAuth2User todomonOAuth2User = getPrincipal();
Todo todo = null;
Boolean isInstance = null;
if (arg instanceof UpdateAndDeleteTodoQueryParams) {
isInstance = ((UpdateAndDeleteTodoQueryParams) arg).getIsInstance();
} else {
isInstance = (Boolean) arg;
}
if (isInstance) {
TodoInstance todoInstance = todoInstanceRepository.findById(objectId)
.orElseThrow(() -> new NotFoundException(ErrorCode.NOT_FOUND_TODO));
todo = todoInstance.getTodo();
} else {
todo = todoRepository.findById(objectId)
.orElseThrow(() -> new NotFoundException(ErrorCode.NOT_FOUND_TODO));
}
if (!hasAdminAuthority()
&& isNotMyTodo(todomonOAuth2User, todo)) {
log.error("{} FORBIDDEN - userId={}, objectId={}, isInstance={}", Thread.currentThread().getName(), todomonOAuth2User.getId(), objectId, isInstance);
throw new ForbiddenException(ErrorCode.FORBIDDEN);
}
joinPoint.proceed();
}
private static boolean isNotMyTodo(TodomonOAuth2User todomonOAuth2User, Todo todo) {
return !Objects.equals(todomonOAuth2User.getId(), todo.getWriter().getId());
}
public boolean hasAdminAuthority() {
return getPrincipal().getAuthorities().contains(new SimpleGrantedAuthority(ROLE_ADMIN.name()));
}
public TodomonOAuth2User getPrincipal() {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
return (TodomonOAuth2User) authentication.getPrincipal();
}
}
- AOP
- Todo를 한번 조회한다. 요청한 유저 아이디와 조회된 todo의 작성자 아이디가 일치하면 비즈니스 로직으로 넘어간다.
- 비즈니스 로직
- 요청한 member를 조회한다.
- todo를 조회한다.
- todo의 상태를 업데이트한다.
- member의 필드 3개를 업데이트한다.
⇒ 총 2개의 엔터티에 대한 조회 및 업데이트가 발생한다.
TODO_CNT_PER_MEMBER
가 1일 때는 문제가 되지 않았다. 서로 다른 유저들의 1개의 todo 수정 요청에 대해서는 문제가 없다는 의미이다. 하지만, TODO_CNT_PER_MEMBER
를 2로 높이는 순간부터 문제가 발생했다.
그럼 threadCount를 1로 바꾸고, TODO_CNT_PER_MEMBER
를 2로 높여보자.
→ 실패한다..
발생한 에러 로그는 다음과 같다.
java.lang.AssertionError:
Expecting all elements of:
[com.maruhxn.todomon.core.domain.member.domain.Member@2af4717c]
to match given predicate but this element did not:
com.maruhxn.todomon.core.domain.member.domain.Member@2af4717c
at com.maruhxn.todomon.core.domain.todo.application.TodoMultiThreadTest.mulithreadUpdateStatus(TodoMultiThreadTest.java:127)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
이는 다음의 코드를 만족하지 못해서 발생한 에러다.
assertThat(memberRepository.findAll())
.hasSize(threadCount)
.allMatch(member -> member.getDailyAchievementCnt() == TODO_CNT_PER_MEMBER)
.allMatch(member -> member.getScheduledReward() == TODO_CNT_PER_MEMBER * 10);
그렇다면 어떤 값을 갖는가?
어차피 멤버는 1명이므로 다음의 코드로 변경해서 확인해보면
assertThat(memberRepository.findById(1L).get()).satisfies(
member -> {
assertThat(member.getDailyAchievementCnt()).isEqualTo(TODO_CNT_PER_MEMBER);
assertThat(member.getScheduledReward()).isEqualTo(TODO_CNT_PER_MEMBER * 10);
}
);
--
org.assertj.core.error.AssertJMultipleFailuresError:
Multiple Failures (1 failure)
-- failure 1 --
expected: 2L
but was: 1L
at TodoMultiThreadTest.lambda$mulithreadUpdateStatus$1(TodoMultiThreadTest.java:131)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at com.maruhxn.todomon.core.domain.todo.application.TodoMultiThreadTest.mulithreadUpdateStatus(TodoMultiThreadTest.java:129)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
Expected :2L
Actual :1L
값이 수정되지 않았다..!
이는 레이스 컨디션(Race Condition)으로 인한 문제이다.
Race Condition은 둘 이상의 Thread가 공유 자원에 접근해서 동시에 변경을 할 때 발생하는 문제로, 여러 스레드가 모두 같은 Member에 접근함으로써 발생했다.
각 스레드의 조회 순간에는 모두 동일한 버전의 Member에 접근하므로 이때의 dailyAchievementCnt
값은 0일 것이다. 그러므로 가장 마지막에 실행되는 스레드의 작업이 끝날 때 dailyAchievementCnt
는 +1만 될 것이다.
일반적인 예상으로는 작업 하나당 dailyAchievementCnt
가 1씩 증가하기를 바라지만, 위와 같은 Race Condition이 발생하여 정상적으로 수정이되지 못하는 상황이다.
그렇다면 이를 어떻게 해결할 수 있을까?
같은 데이터에 동시에 하나의 스레드만 접근이 가능하게 변경할 필요가 있을 것이다.
동시성 해결 방법: Java의 Synchronized
자바에서는 synchronized
키워드를 통해 데이터에 하나의 스레드만 접근이 가능하도록 한다.
하지만, updateStatusAndReward
메서드 앞에 이 키워드를 추가해서 다시 테스트를 실행해보면 여전히 테스트는 실패한다.
원인
@Transactional
+synchronized
@Transactional
은 내부적으로CGLIB
를 사용하여 Proxy 객체를 생성하여 트랜잭션 관련 처리를 한다.- 즉,
TodoService
가 아닌TodoService
의 Proxy 객체의updateStatusAndReward
를 실행하여 member의 상태를 업데이트 하는 로직을 실행한다. 이때, 상태 업데이트가 DB에 반영되는 시점은 트랜잭션이 커밋되고 종료되는 시점이다 - 즉,
updateStatusAndReward
가 호출되고 트랜잭션이 종료되기 전까지는 상태 변경이 반영되지 않는데,synchronized
는 메서드 선언부에 사용이 되다보니, 해당 메서드가 종료되면 다른 스레드에서 해당 메서드를 실행할 수 있게 된다.- Java의 '
synchronized
키워드가 달린 메소드의 시작 - 종료'와 '영속성 컨텍스트의 생명주기'는 서로 독립적이기에 Member의 변경 사항이 DB에 반영되기 전에 다른 스레드가 해당 메서드를 실행하여 이전 스레드와 동일한 버전의 값을 갖게 된다.
- Java의 '
- 즉, JVM 상에서 메소드의 실행은 제대로 직렬화가 이루어졌지만, 메소드의 실행 - 종료와 변경 사항의 반영 시점이 달라, 다른 스레드에서 같은 값을 불러오는 경우가 발생했고, 결과적으로 데이터 무결성이 깨지게 된 것
해결 방안
- 트랜잭션 내에서 엔터티를 명시적으로 flush
entityManager.flush()
를 명시적으로 호출함으로써 메서드 종료 이전에 변경사항을 강제로 DB에 반영한다.- 메서드의 실행도 직렬화되어 있고, 이전 스레드의 수정사항까지 DB에 반영되게 만들었으니 테스트는 통과한다!
@Transactional
주석 처리 +save()
호출@Transactional
과 synchronized의 결합으로 인해 발생하는 문제점이니@Transactinoal
을 제거하고, JPA Repository의save()
메서드를 통해 변경사항을 명시적으로 반영해주는 방법도 가능하다하지만, 트랜잭션 범위 내에서save()
메서드가 호출될 경우, 곧바로 변경사항을 DB에 반영하지 않는다. 대신 이를 바로 반영하기 위한saveAndFlush()
메서드가 존재한다.
JPA에서 제공하는 모든 메서드는 기본적으로 트랜잭션을 생성한다. 때문에 save()
메서드의 호출과 실행 종료 시점에 따라 트랜잭션이 종료되어 곧바로 데이터베이스에 변경 내용이 반영된다.
결론
- DB 자체적으로는 어떠한 제약도 걸지 않기에 오버헤드 없이 동시성을 제어할 수 있다.
- 단일 서버 - DB 구조의 아키텍처에서는 좋은 제어 방법이 될 수 있다.
- 하지만, 분산 서버 환경이나 JVM 인스턴스가 여러 개일 경우에는 단일 JVM 내에서 동기화하는
synchronized
키워드만으로는 문제를 해결할 수 없다..- => 이러한 이유로 실무에서는 동시성 해결 방법으로
synchronized
는 거의 사용하지 않는다.
- => 이러한 이유로 실무에서는 동시성 해결 방법으로
동시성 해결 방법: DB 단의 Lock 사용
분산된 서버 환경에서 고려해볼 수 있는 기술과 전략은 매우 다양하지만, 대표적인 것들은 다음과 같다.
- 분산 락(Distributed Locks): 여러 서버 간 공유 자원에 대한 접근을 관리할 수 있는 분산 락 메커니즘을 구현한다
- ex)
Apache ZooKeeper
,Redisson
, 또는Hazelcast
와 같은 기술들이 분산 락 기능 제공
- ex)
- 낙관적 락(Optimistic Locking) : 데이터를 읽을 때 버전 번호나 타임스탬프를 확인하고 업데이트를 커밋하기 전에 변경되지 않았는지를 검증하는 기법. 데이터가 변경된 경우 트랜잭션을 롤백한다.
- 비관적 락(Pessimistic Locking) : 낙관적 락과는 반대로, 비관적 락은 트랜잭션 동안 데이터를 잠그는 것. 이는 데이터베이스 수준에서
SELECT FOR UPDATE
문을 사용하여 트랜잭션이 완료될 때까지 레코드를 잠그는 방식입니다.- => 실제로 DB에
X-Lock
적용
- => 실제로 DB에
- 메시지 큐(Message Queues) :
RabbitMQ
,Apache Kafka
,AWS SQS
와 같은 메시지 큐를 사용하면 특정 작업에 대한 접근을 직렬화하여 한 번에 하나의 작업만 데이터를 조작하도록 할 수 있다.
락 적용 방법 선택 - 비관적 락
현재 프로젝트에 분산 락이나 메시지 큐와 같은 새로운 기술을 학습하고 적용하기에는 부담이 있다.. 이에 비해 낙관적 락이나 비관적 락은 JPA를 활용해서 DB 수준에서 동시성 제어가 가능하므로 이 중에서 하나를 고르기로 하였다.
그 중에서, 낙관적 락은 트랜잭션 간 충돌이 발생하지 않는다는 가정 하에 공유 자원에 대한 버전 관리를 통해서 충돌을 감지하는 방식이다.
이를 구현하기 위해 @Version
애노테이션과 필드를 추가해주어야 하며, 이 필드는 엔터티에 수정이 가해질 때마다 필드값이 1씩 증가하게 된다. 또한, 조회 시점의 버전과 수정 시점의 버전이 다를 경우 OptimisticLockException
예외가 발생하게 된다.
이 예외를 잡아 로직이 성공할 때까지 수행하도록 try…catch
를 통해 로직을 직접 작성해주거나, 스프링의 @Retryable
을 통해 재시도하도록 할 수 있다.
단점은 쓰기 작업이 많은 환경에서 계속해서 재시도가 이루어지기에 성능이 크게 저하될 수 있다.
우리 서비스의 경우 Todo 도메인에서는 쓰기 작업이 매우 빈번하므로 좋지 못한 선택이기에 제외했다. 만약 읽기 작업이 빈번한 경우에서는 좋은 선택지가 될 것이다.
비관적 락을 적용해보자!
비관적 락(Pessimistic Lock) 적용
Spring Data JPA에서는 @Lock(LockModeType.PESSIMISTIC_WRITE)
을 통해 비관적 락을 적용할 수 있다.
LockModeType
- LockModeType.PESSIMISTIC_WRITE: X-LOCK 쿼리 수행 → SELECT FOR UPDATE
LockModeType.PESSIMISTIC_READ
: S-LOCK 쿼리 수행 → SELECT FOR SHARE
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT m FROM Member m JOIN FETCH m.diligence WHERE m.id = :memberId")
Optional<Member> findMemberWithDiligenceUsingLock(Long memberId);
서비스 메서드에서 member를 조회하는데 사용하는 메서드에 Lock을 적용해주었더니 테스트는 통과한다.
실제 수행된 쿼리 역시 SELECT … FOR UPDATE
가 수행된다.
추가적으로, todo_instsance
의 경우도 member
에 대해 비관적 락이 걸린다면 문제없이 동작한다. 왜냐하면 수정을 가할 todo
, todo_instance
모두 member
가 주인 엔터티이므로 member
가 같지 않은 이상 같은 todo
혹은 todo_instance
에 접근할 수 없다.
참고 자료
https://ksh-coding.tistory.com/125#2-1. Executors-1
https://velog.io/@fishphobiagg/JPA-Spring-Boot-동시성-제어하기-feat.상품-주문-서비스-영속성-컨텍스트-이해하기
'Project' 카테고리의 다른 글
[TODOMON] EP.14 중복 쿼리 제거하기 with OSIV, AOP, JWT (0) | 2025.02.07 |
---|---|
[TODOMON] EP.13 OneToOne 양방향 관계 조회 시 지연 로딩이 적용되지 않는 문제 (0) | 2025.02.07 |
[TODOMON] EP.11 인덱스를 통한 투두 조회 API 성능 개선 feat. 인덱스 컨디션 푸시다운 (0) | 2025.02.04 |
[TODOMON] EP.10 캐싱을 통한 전체 & 소셜 랭킹 조회 API 개선하기 w/ Redis (1) | 2025.02.04 |
[TODOMON] EP.9 부하 테스트 진행(야매) (0) | 2025.02.04 |